diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2020-09-09 20:56:30 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-09 20:56:30 +0200 |
commit | 71e37c26157c1ff16e3d5474dd119ea61ac84627 (patch) | |
tree | 749cbc304ec1757ab0a5950250d01755fac1ee09 | |
parent | 71051f2a92c3b79cc69b99e605bdd4b9b0e58fb8 (diff) |
Revert "Revert "Revert "Revert "Balder/group commits to tls 2 remaining rebased""""
24 files changed, 316 insertions, 145 deletions
diff --git a/fnet/src/vespa/fnet/task.cpp b/fnet/src/vespa/fnet/task.cpp index 0888cba6582..f5263450e6a 100644 --- a/fnet/src/vespa/fnet/task.cpp +++ b/fnet/src/vespa/fnet/task.cpp @@ -14,9 +14,7 @@ FNET_Task::FNET_Task(FNET_Scheduler *scheduler) } -FNET_Task::~FNET_Task() -{ -} +FNET_Task::~FNET_Task() = default; void diff --git a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp index 85644c2111a..348adf5bf41 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp @@ -17,7 +17,7 @@ TransLogServerMetrics::DomainMetrics::DomainMetrics(metrics::MetricSet *parent, { } -TransLogServerMetrics::DomainMetrics::~DomainMetrics() {} +TransLogServerMetrics::DomainMetrics::~DomainMetrics() = default; void TransLogServerMetrics::DomainMetrics::update(const DomainInfo &stats) @@ -66,7 +66,7 @@ TransLogServerMetrics::TransLogServerMetrics(metrics::MetricSet *parent) { } -TransLogServerMetrics::~TransLogServerMetrics() { } +TransLogServerMetrics::~TransLogServerMetrics() = default; void TransLogServerMetrics::update(const DomainStats &stats) diff --git a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h index 830d643e93e..727b0c6304d 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/metrics/metrics.h> -#include <vespa/searchlib/transactionlog/domain.h> +#include <vespa/searchlib/transactionlog/domainconfig.h> namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index b297dd860ea..c40f1263324 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -22,6 +22,7 @@ #include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h> #include <vespa/searchcore/proton/matchengine/matchengine.h> #include <vespa/searchlib/transactionlog/trans_log_server_explorer.h> +#include <vespa/searchlib/transactionlog/translogserverapp.h> #include <vespa/searchlib/util/fileheadertk.h> #include <vespa/searchlib/common/packets.h> #include <vespa/document/base/exceptions.h> diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 6b561ad5deb..55fd3594463 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -19,7 +19,6 @@ #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> #include <vespa/searchlib/common/fileheadercontext.h> #include <vespa/searchlib/engine/monitorapi.h> -#include <vespa/searchlib/transactionlog/translogserverapp.h> #include <vespa/vespalib/net/component_config_producer.h> #include <vespa/vespalib/net/generic_state_handler.h> #include <vespa/vespalib/net/json_get_handler.h> @@ -31,7 +30,7 @@ #include <shared_mutex> namespace vespalib { class StateServer; } - +namespace search::transactionlog { class TransLogServerApp; } namespace proton { class DiskMemUsageSampler; @@ -52,12 +51,12 @@ class Proton : public IProtonConfigurerOwner, public vespalib::StateExplorer { private: - typedef search::transactionlog::TransLogServerApp TLS; - typedef search::engine::MonitorRequest MonitorRequest; - typedef search::engine::MonitorReply MonitorReply; - typedef search::engine::MonitorClient MonitorClient; - typedef std::map<DocTypeName, DocumentDB::SP> DocumentDBMap; - typedef BootstrapConfig::ProtonConfigSP ProtonConfigSP; + using TLS = search::transactionlog::TransLogServerApp; + using MonitorRequest = search::engine::MonitorRequest; + using MonitorReply = search::engine::MonitorReply; + using MonitorClient = search::engine::MonitorClient; + using DocumentDBMap = std::map<DocTypeName, DocumentDB::SP>; + using ProtonConfigSP = BootstrapConfig::ProtonConfigSP; using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; using BucketSpace = document::BucketSpace; @@ -91,7 +90,7 @@ private: MetricsUpdateHook _metricsHook; std::unique_ptr<MetricsEngine> _metricsEngine; ProtonFileHeaderContext _fileHeaderContext; - TLS::UP _tls; + std::unique_ptr<TLS> _tls; std::unique_ptr<DiskMemUsageSampler> _diskMemUsageSampler; PersistenceEngine::UP _persistenceEngine; DocumentDBMap _documentDBMap; diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index b7eb56d1fd9..a20e0cc3aaa 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -256,7 +256,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size()))); EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())), std::runtime_error, - "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); + "commit failed with code -2. server says: Exception during commit on " + name + " : Incoming serial number(1) must be bigger than the last one (3)."); EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index f822fc80fc1..f8dcf985278 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -24,15 +24,15 @@ maxthreads int default=4 restart crcmethod enum {ccitt_crc32, xxh64} default=xxh64 ## Control compression type. -compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4 +compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=NONE ## Control compression level ## LZ4 has normal range 1..9 while ZSTD has range 1..19 ## 9 is a reasonable default for both -compression.level int default=9 +compression.level int default=3 ## How large a chunk can grow in memory before beeing flushed chunk.sizelimit int default = 256000 # 256k ## How long a chunk can reside in memory befor ebeeing flushed to disk. -chunk.agelimit double default = 0.010 # 10 milliseconds +chunk.agelimit double default = 0.004 # 4 milliseconds seems to be the sweetspot for the dynamic throttling algorithm. diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index 4ead34552bd..5dca84a26c1 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_library(searchlib_transactionlog OBJECT chunks.cpp common.cpp domain.cpp + domainconfig.cpp domainpart.cpp ichunk.cpp nosyncproxy.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 0deceb2668a..5cb1d67d525 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -90,8 +90,17 @@ int makeDirectory(const char * dir); class Writer { public: using DoneCallback = std::shared_ptr<IDestructorCallback>; - virtual ~Writer() { } + virtual ~Writer() = default; virtual void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) = 0; }; +class Destination { +public: + virtual ~Destination() = default; + virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; + virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; + virtual bool connected() const = 0; + virtual bool ok() const = 0; +}; + } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 6b47393336a..45afc0c3ee3 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "domain.h" +#include "domainpart.h" +#include "session.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/io/fileutil.h> @@ -15,10 +17,11 @@ LOG_SETUP(".transactionlog.domain"); using vespalib::string; -using vespalib::make_string; +using vespalib::make_string_short::fmt; using vespalib::LockGuard; using vespalib::makeTask; using vespalib::makeClosure; +using vespalib::makeLambdaTask; using vespalib::Monitor; using vespalib::MonitorGuard; using search::common::FileHeaderContext; @@ -29,17 +32,10 @@ namespace search::transactionlog { VESPA_THREAD_STACK_TAG(domain_commit_executor); -DomainConfig::DomainConfig() - : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), - _compressionLevel(9), - _partSizeLimit(0x10000000), // 256M - _chunkSizeLimit(0x40000), // 256k - _chunkAgeLimit(10ms) -{ } - Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), + _currentChunk(std::make_unique<Chunk>()), _lastSerial(0), _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)), _commitExecutor(commitExecutor), @@ -60,10 +56,10 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { - throw runtime_error(make_string("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno)); + throw runtime_error(fmt("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno)); } if ((retval = makeDirectory(dir().c_str())) != 0) { - throw runtime_error(make_string("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno)); + throw runtime_error(fmt("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno)); } SerialNumList partIdVector = scanDir(); const int64_t lastPart = partIdVector.empty() ? 0 : partIdVector.back(); @@ -127,7 +123,12 @@ private: bool & _pendingSync; }; -Domain::~Domain() { } +Domain::~Domain() { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + commitChunk(grabCurrentChunk(guard), guard); + _singleCommiter->shutdown().sync(); +} DomainInfo Domain::getDomainInfo() const @@ -308,10 +309,92 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } +Domain::Chunk::Chunk() + : _data(size_t(-1)), + _callBacks(), + _firstArrivalTime() +{} + +Domain::Chunk::~Chunk() = default; + void -Domain::commit(const Packet & packet, Writer::DoneCallback onDone) -{ - (void) onDone; +Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) { + if (_callBacks.empty()) { + _firstArrivalTime = vespalib::steady_clock::now(); + } + _data.merge(packet); + _callBacks.emplace_back(std::move(onDone)); +} + +vespalib::duration +Domain::Chunk::age() const { + if (_callBacks.empty()) { + return 0ms; + } + return (vespalib::steady_clock::now() - _firstArrivalTime); +} + +void +Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + if (_lastSerial >= packet.range().from()) { + throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } + _currentChunk->add(packet, std::move(onDone)); + commitIfFull(guard); +} + +void +Domain::commitIfFull(const vespalib::MonitorGuard &guard) { + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { + auto completed = grabCurrentChunk(guard); + if (completed) { + commitChunk(std::move(completed), guard); + } + } +} + +std::unique_ptr<Domain::Chunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = std::make_unique<Chunk>(); + return chunk; +} + +bool +Domain::commitIfStale() { + vespalib::MonitorGuard guard(_currentChunkMonitor); + return commitIfStale(guard); +} + +bool +Domain::commitIfStale(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + if ((_currentChunk->age() > _config.getChunkAgeLimit()) && ! _currentChunk->getPacket().empty()) { + return commitChunk(grabCurrentChunk(guard), guard); + } + return false; +} + +bool +Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + if ( ! chunk->getPacket().empty()) { + _singleCommiter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { + doCommit(std::move(chunk)); + })); + return true; + } + return false; +} + +void +Domain::doCommit(std::unique_ptr<Chunk> chunk) { + const Packet & packet = chunk->getPacket(); DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; @@ -331,7 +414,13 @@ Domain::commit(const Packet & packet, Writer::DoneCallback onDone) vespalib::File::sync(dir()); } dp->commit(entry.serial(), packet); + if (_config.getFSyncOnCommit()) { + dp->sync(); + } cleanSessions(); + LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes and age %ld us", + chunk->getNumCallBacks(), chunk->getPacket().size(), + chunk->sizeBytes(), vespalib::count_us(chunk->age())); } bool @@ -356,7 +445,7 @@ Domain::erase(SerialNum to) int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, - std::unique_ptr<Session::Destination> dest) + std::unique_ptr<Destination> dest) { assert(this == domain.get()); cleanSessions(); @@ -442,7 +531,7 @@ Domain::scanDir() continue; const char *p = ename + wantPrefixLen + 1; uint64_t num = strtoull(p, NULL, 10); - string checkName = make_string("%s-%016" PRIu64, _name.c_str(), num); + string checkName = fmt("%s-%016" PRIu64, _name.c_str(), num); if (strcmp(checkName.c_str(), ename) != 0) continue; res.push_back(static_cast<SerialNum>(num)); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index a6c6dad5fe8..5172f0508dc 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -1,70 +1,23 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "domainpart.h" -#include "session.h" +#include "domainconfig.h" +#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadexecutor.h> -#include <vespa/vespalib/util/time.h> -#include <vespa/fastos/thread.h> -#include <chrono> +#include <atomic> +namespace search::common { class FileHeaderContext; } namespace search::transactionlog { -class DomainConfig { -public: - using duration = vespalib::duration; - DomainConfig(); - DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } - DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } - DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } - DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } - DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } - Encoding getEncoding() const { return _encoding; } - size_t getPartSizeLimit() const { return _partSizeLimit; } - size_t getChunkSizeLimit() const { return _chunkSizeLimit; } - duration getChunkAgeLimit() const { return _chunkAgeLimit; } - uint8_t getCompressionlevel() const { return _compressionLevel; } -private: - Encoding _encoding; - uint8_t _compressionLevel; - size_t _partSizeLimit; - size_t _chunkSizeLimit; - duration _chunkAgeLimit; -}; - -struct PartInfo { - SerialNumRange range; - size_t numEntries; - size_t byteSize; - vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) - : range(range_in), - numEntries(numEntries_in), - byteSize(byteSize_in), - file(file_in) - {} -}; - -struct DomainInfo { - using DurationSeconds = std::chrono::duration<double>; - SerialNumRange range; - size_t numEntries; - size_t byteSize; - DurationSeconds maxSessionRunTime; - std::vector<PartInfo> parts; - DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in) - : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {} - DomainInfo() - : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {} -}; - -typedef std::map<vespalib::string, DomainInfo> DomainStats; +class DomainPart; +class Session; class Domain { public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::SyncableThreadExecutor; + using DomainPartSP = std::shared_ptr<DomainPart>; Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); @@ -75,12 +28,13 @@ public: bool erase(SerialNum to); void commit(const Packet & packet, Writer::DoneCallback onDone); - int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest); + int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Destination> dest); SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; void triggerSyncNow(); + bool commitIfStale(); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -91,7 +45,7 @@ public: int closeSession(int sessionId); SerialNum findOldestActiveVisit() const; - DomainPart::SP findPart(SerialNum s); + DomainPartSP findPart(SerialNum s); static vespalib::string getDir(const vespalib::string & base, const vespalib::string & domain) { @@ -103,6 +57,26 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: + bool commitIfStale(const vespalib::MonitorGuard & guard); + void commitIfFull(const vespalib::MonitorGuard & guard); + class Chunk { + public: + Chunk(); + ~Chunk(); + void add(const Packet & packet, Writer::DoneCallback onDone); + size_t sizeBytes() const { return _data.sizeBytes(); } + const Packet & getPacket() const { return _data; } + vespalib::duration age() const; + size_t getNumCallBacks() const { return _callBacks.size(); } + private: + Packet _data; + std::vector<Writer::DoneCallback> _callBacks; + vespalib::steady_time _firstArrivalTime; + }; + + std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + bool commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + void doCommit(std::unique_ptr<Chunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; @@ -115,11 +89,12 @@ private: SerialNumList scanDir(); - using SessionList = std::map<int, Session::SP>; - using DomainPartList = std::map<int64_t, DomainPart::SP>; + using SessionList = std::map<int, std::shared_ptr<Session>>; + using DomainPartList = std::map<int64_t, DomainPartSP>; using DurationSeconds = std::chrono::duration<double>; DomainConfig _config; + std::unique_ptr<Chunk> _currentChunk; SerialNum _lastSerial; std::unique_ptr<Executor> _singleCommiter; Executor & _commitExecutor; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp new file mode 100644 index 00000000000..beac8cf714b --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp @@ -0,0 +1,16 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "domainconfig.h" + +namespace search::transactionlog { + +DomainConfig::DomainConfig() + : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), + _compressionLevel(9), + _fSyncOnCommit(false), + _partSizeLimit(0x10000000), // 256M + _chunkSizeLimit(0x40000), // 256k + _chunkAgeLimit(10ms) +{ } + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h new file mode 100644 index 00000000000..ada1e20e095 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h @@ -0,0 +1,63 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "ichunk.h" +#include <vespa/vespalib/util/time.h> +#include <map> + +namespace search::transactionlog { + +class DomainConfig { +public: + using duration = vespalib::duration; + DomainConfig(); + DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } + DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } + DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } + DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } + DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } + DomainConfig & setFSyncOnCommit(bool v) { _fSyncOnCommit = v; return *this; } + Encoding getEncoding() const { return _encoding; } + size_t getPartSizeLimit() const { return _partSizeLimit; } + size_t getChunkSizeLimit() const { return _chunkSizeLimit; } + duration getChunkAgeLimit() const { return _chunkAgeLimit; } + uint8_t getCompressionlevel() const { return _compressionLevel; } + bool getFSyncOnCommit() const { return _fSyncOnCommit; } +private: + Encoding _encoding; + uint8_t _compressionLevel; + bool _fSyncOnCommit; + size_t _partSizeLimit; + size_t _chunkSizeLimit; + duration _chunkAgeLimit; +}; + +struct PartInfo { + SerialNumRange range; + size_t numEntries; + size_t byteSize; + vespalib::string file; + PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) + : range(range_in), + numEntries(numEntries_in), + byteSize(byteSize_in), + file(file_in) + {} +}; + +struct DomainInfo { + using DurationSeconds = std::chrono::duration<double>; + SerialNumRange range; + size_t numEntries; + size_t byteSize; + DurationSeconds maxSessionRunTime; + std::vector<PartInfo> parts; + DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in) + : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {} + DomainInfo() + : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {} +}; + +using DomainStats = std::map<vespalib::string, DomainInfo>; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 96e75c35a81..40accb0057d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate) DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression + : _encoding(encoding), _compressionLevel(compressionLevel), _lock(), _fileLock(), @@ -429,7 +429,8 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) } } -void DomainPart::sync() +void +DomainPart::sync() { SerialNum syncSerial(0); { diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 5256b731125..31d6938b654 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -15,12 +15,10 @@ namespace search::common { class FileHeaderContext; } namespace search::transactionlog { class DomainPart { -private: - DomainPart(const DomainPart &); - DomainPart& operator=(const DomainPart &); - public: - typedef std::shared_ptr<DomainPart> SP; + using SP = std::shared_ptr<DomainPart>; + DomainPart(const DomainPart &) = delete; + DomainPart& operator=(const DomainPart &) = delete; DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index 5e44815cb1b..854227cfc9c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -3,9 +3,6 @@ #pragma once #include "common.h" -#include <memory> - -namespace vespalib { class nbostream; } namespace search::transactionlog { diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index dda840808ce..c91b719be37 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "session.h" #include "domain.h" +#include "domainpart.h" #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/log/log.h> diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index 9b8d23371e8..ddbe218ed4e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -23,14 +23,6 @@ private: using time_point = std::chrono::time_point<std::chrono::steady_clock>; public: - class Destination { - public: - virtual ~Destination() = default; - virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; - virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; - virtual bool connected() const = 0; - virtual bool ok() const = 0; - }; typedef std::shared_ptr<Session> SP; Session(const Session &) = delete; Session & operator = (const Session &) = delete; diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp index 1d1edbed658..bdf12ab64e8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "trans_log_server_explorer.h" +#include "translogserver.h" +#include "domain.h" #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/util/time.h> #include <vespa/fastos/file.h> @@ -44,6 +46,7 @@ struct DomainExplorer : vespalib::StateExplorer { } // namespace search::transactionlog::<unnamed> +TransLogServerExplorer::~TransLogServerExplorer() = default; void TransLogServerExplorer::get_state(const Inserter &inserter, bool full) const { diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h index 65d3a687bc9..66fb1698104 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h +++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h @@ -2,24 +2,27 @@ #pragma once -#include "translogserver.h" #include <vespa/vespalib/net/state_explorer.h> namespace search::transactionlog { +class TransLogServer; + /** * Class used to explore the state of a transaction log server. */ class TransLogServerExplorer : public vespalib::StateExplorer { private: - TransLogServer::SP _server; + using TransLogServerSP = std::shared_ptr<TransLogServer>; + TransLogServerSP _server; public: - TransLogServerExplorer(TransLogServer::SP server) : _server(std::move(server)) {} - virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; - virtual std::vector<vespalib::string> get_children_names() const override; - virtual std::unique_ptr<StateExplorer> get_child(vespalib::stringref name) const override; + TransLogServerExplorer(TransLogServerSP server) : _server(std::move(server)) {} + ~TransLogServerExplorer() override; + void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; + std::vector<vespalib::string> get_children_names() const override; + std::unique_ptr<StateExplorer> get_child(vespalib::stringref name) const override; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index edfbf846688..15ce2f537ea 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,9 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" +#include "domain.h" #include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/vespalib/util/time.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/rpcrequest.h> @@ -42,7 +42,6 @@ public: void PerformTask() override; }; - SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, const TransLogServer::Session::SP &session, SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), @@ -53,10 +52,8 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const { } - SyncHandler::~SyncHandler() = default; - void SyncHandler::PerformTask() { @@ -100,6 +97,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con _threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), + _staleCommitThread(), _domains(), _reqQ(), _fileHeaderContext(fileHeaderContext) @@ -144,12 +142,19 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } start(*_threadPool); + _staleCommitThread = std::make_unique<std::thread>([this]() { + while (running()) { + std::this_thread::sleep_for(getChunkAgeLimit()); + commitIfStale(); + } + }); } TransLogServer::~TransLogServer() { stop(); join(); + _staleCommitThread->join(); _commitExecutor.shutdown(); _commitExecutor.sync(); _sessionExecutor.shutdown(); @@ -207,10 +212,16 @@ TransLogServer::run() LOG(info, "TLS Stopped"); } +vespalib::duration +TransLogServer::getChunkAgeLimit() const +{ + Guard domainGuard(_domainMutex); + return _domainConfig.getChunkAgeLimit(); +} TransLogServer & TransLogServer::setDomainConfig(const DomainConfig & cfg) { - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); _domainConfig = cfg; for(auto &domain: _domains) { domain.second->setConfig(cfg); @@ -218,11 +229,21 @@ TransLogServer::setDomainConfig(const DomainConfig & cfg) { return *this; } +bool +TransLogServer::commitIfStale() { + MonitorGuard domainMonitor(_domainMutex); + bool committedAnything(false); + for (const auto &domain : _domains) { + committedAnything = committedAnything || domain.second->commitIfStale(); + } + return committedAnything; +} + DomainStats TransLogServer::getDomainStats() const { DomainStats retval; - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); for (const auto &elem : _domains) { retval[elem.first] = elem.second->getDomainInfo(); } @@ -233,7 +254,7 @@ std::vector<vespalib::string> TransLogServer::getDomainNames() { std::vector<vespalib::string> names; - Guard guard(_lock); + Guard guard(_domainMutex); for(const auto &domain: _domains) { names.push_back(domain.first); } @@ -243,7 +264,7 @@ TransLogServer::getDomainNames() Domain::SP TransLogServer::findDomain(stringref domainName) { - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); Domain::SP domain; DomainList::iterator found(_domains.find(domainName)); if (found != _domains.end()) { @@ -363,7 +384,7 @@ writeDomainDir(std::lock_guard<std::mutex> &guard, vespalib::File::sync(dir); } -class RPCDestination : public Session::Destination { +class RPCDestination : public Destination { public: RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection) : _supervisor(supervisor), _connection(connection), _ok(true) @@ -448,7 +469,7 @@ TransLogServer::createDomain(FRT_RPCRequest *req) try { domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, _domainConfig, _fileHeaderContext); - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); _domains[domain->name()] = domain; writeDomainDir(domainGuard, dir(), domainList(), _domains); } catch (const std::exception & e) { @@ -477,12 +498,12 @@ TransLogServer::deleteDomain(FRT_RPCRequest *req) try { if (domain) { domain->markDeleted(); - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); _domains.erase(domainName); } vespalib::rmdir(Domain::getDir(dir(), domainName), true); vespalib::File::sync(dir()); - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); writeDomainDir(domainGuard, dir(), domainList(), _domains); } catch (const std::exception & e) { msg = make_string("Failed deleting %s domain. Exception = %s", domainName, e.what()); @@ -523,7 +544,7 @@ TransLogServer::listDomains(FRT_RPCRequest *req) LOG(debug, "listDomains()"); vespalib::string domains; - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); for(DomainList::const_iterator it(_domains.begin()), mt(_domains.end()); it != mt; it++) { domains += it->second->name(); domains += "\n"; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 3f945977386..8d5d5aa892f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -1,30 +1,30 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "domain.h" +#include "domainconfig.h" #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/document/util/queue.h> #include <vespa/fnet/frt/invokable.h> #include <mutex> - class FRT_Supervisor; class FNET_Transport; +class FNET_Task; +namespace std {class thread; } namespace search::common { class FileHeaderContext; } - namespace search::transactionlog { class TransLogServerExplorer; +class Domain; class TransLogServer : public document::Runnable, private FRT_Invokable, public Writer { public: friend class TransLogServerExplorer; - typedef std::unique_ptr<TransLogServer> UP; - typedef std::shared_ptr<TransLogServer> SP; - + using SP = std::shared_ptr<TransLogServer>; + using DomainSP = std::shared_ptr<Domain>; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, @@ -33,8 +33,10 @@ public: const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; + bool commitIfStale(); void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; TransLogServer & setDomainConfig(const DomainConfig & cfg); + vespalib::duration getChunkAgeLimit() const; class Session { @@ -71,13 +73,13 @@ private: void downSession(FRT_RPCRequest *req); std::vector<vespalib::string> getDomainNames(); - Domain::SP findDomain(vespalib::stringref name); + DomainSP findDomain(vespalib::stringref name); vespalib::string dir() const { return _baseDir + "/" + _name; } vespalib::string domainList() const { return dir() + "/" + _name + ".domains"; } static const Session::SP & getSession(FRT_RPCRequest *req); - using DomainList = std::map<vespalib::string, Domain::SP >; + using DomainList = std::map<vespalib::string, DomainSP >; vespalib::string _name; vespalib::string _baseDir; @@ -87,12 +89,15 @@ private: std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; + std::unique_ptr<std::thread> _staleCommitThread; DomainList _domains; - mutable std::mutex _lock; // Protects _domains + mutable std::mutex _domainMutex; // Protects _domains + std::condition_variable _domainCondition; std::mutex _fileLock; // Protects the creating and deleting domains including file system operations. document::Queue<FRT_RPCRequest *> _reqQ; const common::FileHeaderContext &_fileHeaderContext; using Guard = std::lock_guard<std::mutex>; + using MonitorGuard = std::unique_lock<std::mutex>; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index d83623661ff..12c38ab5739 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -66,7 +66,8 @@ getDomainConfig(const searchlib::TranslogserverConfig & cfg) { .setCompressionLevel(cfg.compression.level) .setPartSizeLimit(cfg.filesizemax) .setChunkSizeLimit(cfg.chunk.sizelimit) - .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)); + .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)) + .setFSyncOnCommit(cfg.usefsync); return dcfg; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index d46805c105c..fb93559c29f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -23,11 +23,9 @@ private: void configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) override ; public: - typedef std::unique_ptr<TransLogServerApp> UP; - TransLogServerApp(const config::ConfigUri & tlsConfigUri, const common::FileHeaderContext &fileHeaderContext); - ~TransLogServerApp(); + ~TransLogServerApp() override; TransLogServer::SP getTransLogServer() const; |