diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-06 22:13:29 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-10 16:28:01 +0200 |
commit | 961e88a3c382a2390262f390b3fb3d554d010d9a (patch) | |
tree | 19ce3dde9c1ecb58d1808240e988cffe5647065c /searchlib | |
parent | f28c605b6ba4e2674bcef0de2a6d477370537268 (diff) |
executor -> sessionExecutor and some minor syntax changes. No Sematics.
Diffstat (limited to 'searchlib')
4 files changed, 48 insertions, 65 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index a324e42ab24..d5c9fa5afcb 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/fastos/file.h> +#include <algorithm> #include <vespa/log/log.h> LOG_SETUP(".transactionlog.domain"); @@ -20,15 +21,11 @@ using std::runtime_error; namespace search::transactionlog { -Domain::Domain(const string &domainName, - const string & baseDir, - vespalib::ThreadStackExecutor & executor, - uint64_t domainPartSize, - bool useFsync, - DomainPart::Crc defaultCrcType, - const FileHeaderContext &fileHeaderContext) : +Domain::Domain(const string &domainName, const string & baseDir, + vespalib::ThreadExecutor & sessionExecutor, uint64_t domainPartSize, bool useFsync, + DomainPart::Crc defaultCrcType, const FileHeaderContext &fileHeaderContext) : _defaultCrcType(defaultCrcType), - _executor(executor), + _sessionExecutor(sessionExecutor), _sessionId(1), _useFsync(useFsync), _syncMonitor(), @@ -54,10 +51,10 @@ Domain::Domain(const string &domainName, const int64_t lastPart = partIdVector.empty() ? 0 : partIdVector.back(); for (const int64_t partId : partIdVector) { if ( partId != -1) { - _executor.execute(makeTask(makeClosure(this, &Domain::addPart, partId, partId == lastPart))); + _sessionExecutor.execute(makeTask(makeClosure(this, &Domain::addPart, partId, partId == lastPart))); } } - _executor.sync(); + _sessionExecutor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _useFsync, _defaultCrcType, _fileHeaderContext, false)); } @@ -111,8 +108,7 @@ Domain::getDomainInfo() const DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard)); for (const auto &entry: _parts) { const DomainPart &part = *entry.second; - info.parts.emplace_back(PartInfo(part.range(), part.size(), - part.byteSize(), part.fileName())); + info.parts.emplace_back(PartInfo(part.range(), part.size(), part.byteSize(), part.fileName())); } return info; } @@ -198,7 +194,7 @@ Domain::triggerSyncNow() if (!_pendingSync) { _pendingSync = true; DomainPart::SP dp(_parts.rbegin()->second); - _executor.execute(Sync::UP(new Sync(_syncMonitor, dp, _pendingSync))); + _sessionExecutor.execute(Sync::UP(new Sync(_syncMonitor, dp, _pendingSync))); } } @@ -302,7 +298,7 @@ void Domain::commit(const Packet & packet) } } -bool Domain::erase(const SerialNum & to) +bool Domain::erase(SerialNum to) { bool retval(true); /// Do not erase the last element @@ -320,7 +316,8 @@ bool Domain::erase(const SerialNum & to) return retval; } -int Domain::visit(const Domain::SP & domain, const SerialNum & from, const SerialNum & to, FRT_Supervisor & supervisor, FNET_Connection *conn) +int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, + FRT_Supervisor & supervisor, FNET_Connection *conn) { assert(this == domain.get()); cleanSessions(); @@ -354,7 +351,7 @@ int Domain::closeSession(int sessionId) SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { retval = 1; - _executor.sync(); + _sessionExecutor.sync(); } } if (retval == 1) { @@ -371,7 +368,7 @@ int Domain::closeSession(int sessionId) return retval; } -int Domain::subscribe(const Domain::SP & domain, const SerialNum & from, FRT_Supervisor & supervisor, FNET_Connection *conn) +int Domain::subscribe(const Domain::SP & domain, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn) { assert(this == domain.get()); cleanSessions(); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index f70ce7654c1..555744f3dac 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -1,9 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/searchlib/transactionlog/domainpart.h> -#include <vespa/searchlib/transactionlog/session.h> -#include <vespa/vespalib/util/threadstackexecutor.h> +#include "domainpart.h" +#include "session.h" +#include <vespa/vespalib/util/threadexecutor.h> namespace search::transactionlog { @@ -36,30 +36,19 @@ class Domain { public: typedef std::shared_ptr<Domain> SP; - Domain(const vespalib::string &name, - const vespalib::string &baseDir, - vespalib::ThreadStackExecutor & executor, - uint64_t domainPartSize, - bool useFsync, - DomainPart::Crc defaultCrcType, - const common::FileHeaderContext &fileHeaderContext); + Domain(const vespalib::string &name, const vespalib::string &baseDir, + vespalib::ThreadExecutor & sessionExecutor, uint64_t domainPartSize, bool useFsync, + DomainPart::Crc defaultCrcType, const common::FileHeaderContext &fileHeaderContext); virtual ~Domain(); DomainInfo getDomainInfo() const; - const vespalib::string & name() const { return _name; } - bool erase(const SerialNum & to); + bool erase(SerialNum to); void commit(const Packet & packet); - int - visit(const Domain::SP & self, - const SerialNum & from, - const SerialNum & to, - FRT_Supervisor & supervisor, - FNET_Connection *conn); - - int subscribe(const Domain::SP & self, const SerialNum & from, FRT_Supervisor & supervisor, FNET_Connection *conn); + int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn); + int subscribe(const Domain::SP & self, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn); SerialNum begin() const; SerialNum end() const; @@ -82,7 +71,7 @@ public: return base + "/" + domain; } vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) { - return _executor.execute(std::move(task)); + return _sessionExecutor.execute(std::move(task)); } uint64_t size() const; private: @@ -94,16 +83,16 @@ private: vespalib::string dir() const { return getDir(_baseDir, _name); } void addPart(int64_t partId, bool isLastPart); - typedef std::vector<SerialNum> SerialNumList; + using SerialNumList = std::vector<SerialNum>; SerialNumList scanDir(); - typedef std::map<int, Session::SP > SessionList; - typedef std::map<int64_t, DomainPart::SP > DomainPartList; - typedef vespalib::ThreadStackExecutor Executor; + using SessionList = std::map<int, Session::SP>; + using DomainPartList = std::map<int64_t, DomainPart::SP>; + using Executor = vespalib::ThreadExecutor; DomainPart::Crc _defaultCrcType; - Executor & _executor; + Executor & _sessionExecutor; std::atomic<int> _sessionId; const bool _useFsync; vespalib::Monitor _syncMonitor; @@ -117,7 +106,6 @@ private: vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; bool _markedDeleted; - bool _urgentSync; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ad5654978ae..b273f5f092a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -91,7 +91,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, _domainPartSize(domainPartSize), _useFsync(useFsync), _defaultCrcType(defaultCrcType), - _executor(maxThreads, 128*1024), + _sessionExecutor(maxThreads, 128*1024), _threadPool(8192, 1), _supervisor(std::make_unique<FRT_Supervisor>()), _domains(), @@ -109,7 +109,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, try { Domain::SP domain(new Domain(domainName, dir(), - _executor, + _sessionExecutor, _domainPartSize, _useFsync, _defaultCrcType, @@ -361,7 +361,7 @@ void TransLogServer::createDomain(FRT_RPCRequest *req) try { domain.reset(new Domain(domainName, dir(), - _executor, + _sessionExecutor, _domainPartSize, _useFsync, _defaultCrcType, diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index e7aca212b07..d04fb4f56d1 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -3,6 +3,7 @@ #include "domain.h" #include <vespa/vespalib/util/document_runnable.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/document/util/queue.h> #include <vespa/fnet/frt/invokable.h> #include <mutex> @@ -20,8 +21,7 @@ public: typedef std::unique_ptr<TransLogServer> UP; typedef std::shared_ptr<TransLogServer> SP; - TransLogServer(const vespalib::string &name, - int listenPort, + TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize=0x10000000, @@ -29,8 +29,6 @@ public: size_t maxThreads=4, DomainPart::Crc defaultCrc=DomainPart::xxh64); virtual ~TransLogServer(); - uint64_t getDomainPartSize() const { return _domainPartSize; } - uint64_t setDomainPartSize(); DomainStats getDomainStats() const; void commit(const vespalib::string & domainName, const Packet & packet) override; @@ -79,20 +77,20 @@ private: static const Session::SP & getSession(FRT_RPCRequest *req); - typedef std::map<vespalib::string, Domain::SP > DomainList; - - vespalib::string _name; - vespalib::string _baseDir; - const uint64_t _domainPartSize; - const bool _useFsync; - const DomainPart::Crc _defaultCrcType; - vespalib::ThreadStackExecutor _executor; - FastOS_ThreadPool _threadPool; - std::unique_ptr<FRT_Supervisor> _supervisor; - DomainList _domains; - mutable std::mutex _lock; // Protects _domains - std::mutex _fileLock; // Protects the creating and deleting domains including file system operations. - document::Queue<FRT_RPCRequest *> _reqQ; + using DomainList = std::map<vespalib::string, Domain::SP >; + + vespalib::string _name; + vespalib::string _baseDir; + const uint64_t _domainPartSize; + const bool _useFsync; + const DomainPart::Crc _defaultCrcType; + vespalib::ThreadStackExecutor _sessionExecutor; + FastOS_ThreadPool _threadPool; + std::unique_ptr<FRT_Supervisor> _supervisor; + DomainList _domains; + mutable std::mutex _lock; // Protects _domains + std::mutex _fileLock; // Protects the creating and deleting domains including file system operations. + document::Queue<FRT_RPCRequest *> _reqQ; const common::FileHeaderContext &_fileHeaderContext; using Guard = std::lock_guard<std::mutex>; }; |