diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-17 19:35:28 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-10 09:56:22 +0100 |
commit | 2b55ae64f687e83e72c4f5c0062ff482c818d47e (patch) | |
tree | 3325a9816e4728ddfd198ce057dcf32049f97061 /searchlib | |
parent | d993d34963d3e52ce56df43496dd8455239e8247 (diff) |
Wire in grouping in larger packets.
Diffstat (limited to 'searchlib')
5 files changed, 149 insertions, 60 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index a84e27b2e53..b6009111e3e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -83,8 +83,7 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) : _type(t), _valid(true), _data(d) -{ -} +{ } bool Packet::add(const Packet::Entry & e) diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 88c2dd9ecc3..d7711c80051 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -20,28 +20,35 @@ using vespalib::MonitorGuard; using search::common::FileHeaderContext; using std::runtime_error; using namespace std::chrono_literals; +using namespace std::chrono; namespace search::transactionlog { -Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, - const FileHeaderContext &fileHeaderContext) : +Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize, + DomainPart::Crc defaultCrcType, const FileHeaderContext &fileHeaderContext) : + _currentChunk(std::make_unique<Chunk>()), _defaultCrcType(defaultCrcType), + _threadPool(threadPool), _commitExecutor(commitExecutor), _sessionExecutor(sessionExecutor), _sessionId(1), _syncMonitor(), _pendingSync(false), _name(domainName), - _domainPartSize(domainPartSize), + _domainPartSizeLimit(domainPartSize), + _chunkSizeLimit(0x40000), + _chunkAgeLimit(10ms), _parts(), _lock(), + _currentChunkMonitor(), _sessionLock(), _sessions(), _maxSessionRunTime(), _baseDir(baseDir), _fileHeaderContext(fileHeaderContext), - _markedDeleted(false) + _markedDeleted(false), + _self(nullptr) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { @@ -61,8 +68,19 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm if (_parts.empty() || _parts.crbegin()->second->isClosed()) { _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false)); } + _self = _threadPool.NewThread(this); + assert(_self); } +void +Domain::Run(FastOS_ThreadInterface *thisThread, void *) { + + while (!thisThread->GetBreakFlag()) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + guard.wait(duration_cast<milliseconds>(_chunkAgeLimit).count()); + commitIfStale(guard); + } +} void Domain::addPart(int64_t partId, bool isLastPart) { DomainPart::SP dp(new DomainPart(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart)); if (dp->size() == 0) { @@ -102,7 +120,16 @@ private: bool & _pendingSync; }; -Domain::~Domain() { } +Domain::~Domain() { + if (_self) { + _self->SetBreakFlag(); + { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + } + _self->Join(); + } +} DomainInfo Domain::getDomainInfo() const @@ -267,7 +294,8 @@ void Domain::cleanSessions() namespace { -void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) +void +waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) { MonitorGuard guard(syncMonitor); while (pendingSync) { @@ -277,13 +305,66 @@ void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } -void Domain::commit(const Packet & packet) +void +Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) { + if (_callBacks.empty()) { + _firstArrivalTime = steady_clock::now(); + } + if ( ! _data.merge(packet) ) { + throw runtime_error(make_string("Failed merging of packet %zu into packet %zu", + packet.range().from(), _data.range().from())); + } + _callBacks.emplace_back(std::move(onDone)); +} + +microseconds +Domain::Chunk::age() const { + if (_callBacks.empty()) { + return 0ms; + } + return duration_cast<microseconds>(steady_clock::now() - _firstArrivalTime); +} + +void +Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { + + std::unique_ptr<Chunk> completed; + vespalib::MonitorGuard guard(_currentChunkMonitor); + _currentChunk->add(packet, std::move(onDone)); + if (_currentChunk->sizeBytes() > _chunkSizeLimit) { + completed = grabCurrentChunk(guard); + } + if (completed) { + commitChunk(std::move(_currentChunk), guard); + } +} + +std::unique_ptr<Domain::Chunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = std::make_unique<Chunk>(); + return chunk; +} + +void +Domain::commitIfStale(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + if (_currentChunk->age() > _chunkAgeLimit) { + commitChunk(grabCurrentChunk(guard), guard); + } +} + +void +Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + const Packet & packet = chunk->getPacket(); DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().c_str(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); - if (dp->byteSize() > _domainPartSize) { + if (dp->byteSize() > _domainPartSizeLimit) { waitPendingSync(_syncMonitor, _pendingSync); triggerSyncNow(); waitPendingSync(_syncMonitor, _pendingSync); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index c1ff9157a6f..6d508e6d72e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -35,22 +35,22 @@ struct DomainInfo { typedef std::map<vespalib::string, DomainInfo> DomainStats; -class Domain +class Domain final : public FastOS_Runnable { public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::ThreadExecutor; - Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, - const common::FileHeaderContext &fileHeaderContext); + Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, uint64_t domainPartSize, + DomainPart::Crc defaultCrcType, const common::FileHeaderContext &fileHeaderContext); - virtual ~Domain(); + ~Domain() override; DomainInfo getDomainInfo() const; const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet); + void commit(const Packet & packet, Writer::DoneCallback onDone); int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn); SerialNum begin() const; @@ -79,6 +79,22 @@ public: uint64_t size() const; private: + void Run(FastOS_ThreadInterface *thisThread, void *arguments) override; + void commitIfStale(const vespalib::MonitorGuard & guard); + class Chunk { + public: + void add(const Packet & packet, Writer::DoneCallback onDone); + size_t sizeBytes() const { return _data.sizeBytes(); } + const Packet & getPacket() const { return _data; } + std::chrono::microseconds age() const; + private: + Packet _data; + std::vector<Writer::DoneCallback> _callBacks; + std::chrono::steady_clock::time_point _firstArrivalTime; + }; + + std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; @@ -95,22 +111,28 @@ private: using DomainPartList = std::map<int64_t, DomainPart::SP>; using DurationSeconds = std::chrono::duration<double>; + std::unique_ptr<Chunk> _currentChunk; DomainPart::Crc _defaultCrcType; + FastOS_ThreadPool & _threadPool; Executor & _commitExecutor; Executor & _sessionExecutor; std::atomic<int> _sessionId; vespalib::Monitor _syncMonitor; bool _pendingSync; vespalib::string _name; - uint64_t _domainPartSize; + const uint64_t _domainPartSizeLimit; + const uint64_t _chunkSizeLimit; + const std::chrono::microseconds _chunkAgeLimit; DomainPartList _parts; vespalib::Lock _lock; + vespalib::Monitor _currentChunkMonitor; vespalib::Lock _sessionLock; SessionList _sessions; DurationSeconds _maxSessionRunTime; vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; bool _markedDeleted; + FastOS_ThreadInterface * _self; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 35bdc71c963..06e0820c592 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -31,23 +31,15 @@ void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); string -handleWriteError(const char *text, - FastOS_FileInterface &file, - int64_t lastKnownGoodPos, - const Packet::Entry &entry, - int bufLen) __attribute__ ((noinline)); +handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, + const Packet::Entry &entry, int bufLen) __attribute__ ((noinline)); bool -handleReadError(const char *text, - FastOS_FileInterface &file, - ssize_t len, - ssize_t rlen, - int64_t lastKnownGoodPos, - bool allowTruncate) __attribute__ ((noinline)); +handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, + int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline)); bool -addPacket(Packet &packet, - const Packet::Entry &e) __attribute__ ((noinline)); +addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); @@ -599,10 +591,7 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) } bool -DomainPart::read(FastOS_FileInterface &file, - Packet::Entry &entry, - Alloc & buf, - bool allowTruncate) +DomainPart::read(FastOS_FileInterface &file, Packet::Entry &entry, Alloc & buf, bool allowTruncate) { bool retval(true); char tmp[5]; @@ -615,8 +604,8 @@ DomainPart::read(FastOS_FileInterface &file, if ((retval = (rlen == sizeof(tmp)))) { if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) { string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," - " got %d from '%s' at position %ld", - version, file.GetFileName(), lastKnownGoodPos)); + " got %d from '%s' at position %ld", + version, file.GetFileName(), lastKnownGoodPos)); if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { LOG(warning, "%s", msg.c_str()); return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 4c3c5609a93..cf2b1510526 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> @@ -13,6 +14,8 @@ using vespalib::make_string; using vespalib::stringref; using vespalib::IllegalArgumentException; using search::common::FileHeaderContext; +using std::make_shared; +using std::runtime_error; namespace search::transactionlog { @@ -26,21 +29,16 @@ class SyncHandler : public FNET_Task SerialNum _syncTo; public: - SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req,const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo); + SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo); ~SyncHandler(); void PerformTask() override; }; -SyncHandler::SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req, - const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo) +SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), _req(*req), _domain(domain), @@ -50,9 +48,7 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, } -SyncHandler::~SyncHandler() -{ -} +SyncHandler::~SyncHandler() = default; void @@ -95,7 +91,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con _defaultCrcType(defaultCrcType), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), - _threadPool(8192, 1), + _threadPool(0x20000), _supervisor(std::make_unique<FRT_Supervisor>()), _domains(), _reqQ(), @@ -110,8 +106,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType,_fileHeaderContext); + auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor, + _domainPartSize, _defaultCrcType,_fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -132,13 +128,13 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con } } if ( ! listenOk ) { - throw std::runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec)); + throw runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec)); } } else { - throw std::runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno)); + throw runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno)); } } else { - throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); + throw runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } start(_threadPool); } @@ -200,7 +196,7 @@ void TransLogServer::run() } } logMetric(); - } while (running() && !(hasPacket && (req == NULL))); + } while (running() && !(hasPacket && (req == nullptr))); LOG(info, "TLS Stopped"); } @@ -349,8 +345,8 @@ void TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType, _fileHeaderContext); + domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, _sessionExecutor, + _domainPartSize, _defaultCrcType, _fileHeaderContext); { Guard domainGuard(_lock); _domains[domain->name()] = domain; @@ -462,7 +458,7 @@ void TransLogServer::commit(const vespalib::string & domainName, const Packet & (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { - domain->commit(packet); + domain->commit(packet, std::move(done)); } else { throw IllegalArgumentException("Could not find domain " + domainName); } @@ -478,7 +474,9 @@ void TransLogServer::domainCommit(FRT_RPCRequest *req) if (domain) { Packet packet(params[1]._data._buf, params[1]._data._len); try { - domain->commit(packet); + vespalib::Gate gate; + domain->commit(packet, make_shared<GateCallback>(gate)); + gate.await(); ret.AddInt32(0); ret.AddString("ok"); } catch (const std::exception & e) { |