diff options
23 files changed, 180 insertions, 141 deletions
diff --git a/fnet/src/vespa/fnet/task.cpp b/fnet/src/vespa/fnet/task.cpp index 0888cba6582..f5263450e6a 100644 --- a/fnet/src/vespa/fnet/task.cpp +++ b/fnet/src/vespa/fnet/task.cpp @@ -14,9 +14,7 @@ FNET_Task::FNET_Task(FNET_Scheduler *scheduler) } -FNET_Task::~FNET_Task() -{ -} +FNET_Task::~FNET_Task() = default; void diff --git a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp index 85644c2111a..348adf5bf41 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.cpp @@ -17,7 +17,7 @@ TransLogServerMetrics::DomainMetrics::DomainMetrics(metrics::MetricSet *parent, { } -TransLogServerMetrics::DomainMetrics::~DomainMetrics() {} +TransLogServerMetrics::DomainMetrics::~DomainMetrics() = default; void TransLogServerMetrics::DomainMetrics::update(const DomainInfo &stats) @@ -66,7 +66,7 @@ TransLogServerMetrics::TransLogServerMetrics(metrics::MetricSet *parent) { } -TransLogServerMetrics::~TransLogServerMetrics() { } +TransLogServerMetrics::~TransLogServerMetrics() = default; void TransLogServerMetrics::update(const DomainStats &stats) diff --git a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h index 830d643e93e..727b0c6304d 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/trans_log_server_metrics.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/metrics/metrics.h> -#include <vespa/searchlib/transactionlog/domain.h> +#include <vespa/searchlib/transactionlog/domainconfig.h> namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index b297dd860ea..c40f1263324 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -22,6 +22,7 @@ #include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h> #include <vespa/searchcore/proton/matchengine/matchengine.h> #include <vespa/searchlib/transactionlog/trans_log_server_explorer.h> +#include <vespa/searchlib/transactionlog/translogserverapp.h> #include <vespa/searchlib/util/fileheadertk.h> #include <vespa/searchlib/common/packets.h> #include <vespa/document/base/exceptions.h> diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 6b561ad5deb..55fd3594463 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -19,7 +19,6 @@ #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> #include <vespa/searchlib/common/fileheadercontext.h> #include <vespa/searchlib/engine/monitorapi.h> -#include <vespa/searchlib/transactionlog/translogserverapp.h> #include <vespa/vespalib/net/component_config_producer.h> #include <vespa/vespalib/net/generic_state_handler.h> #include <vespa/vespalib/net/json_get_handler.h> @@ -31,7 +30,7 @@ #include <shared_mutex> namespace vespalib { class StateServer; } - +namespace search::transactionlog { class TransLogServerApp; } namespace proton { class DiskMemUsageSampler; @@ -52,12 +51,12 @@ class Proton : public IProtonConfigurerOwner, public vespalib::StateExplorer { private: - typedef search::transactionlog::TransLogServerApp TLS; - typedef search::engine::MonitorRequest MonitorRequest; - typedef search::engine::MonitorReply MonitorReply; - typedef search::engine::MonitorClient MonitorClient; - typedef std::map<DocTypeName, DocumentDB::SP> DocumentDBMap; - typedef BootstrapConfig::ProtonConfigSP ProtonConfigSP; + using TLS = search::transactionlog::TransLogServerApp; + using MonitorRequest = search::engine::MonitorRequest; + using MonitorReply = search::engine::MonitorReply; + using MonitorClient = search::engine::MonitorClient; + using DocumentDBMap = std::map<DocTypeName, DocumentDB::SP>; + using ProtonConfigSP = BootstrapConfig::ProtonConfigSP; using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; using BucketSpace = document::BucketSpace; @@ -91,7 +90,7 @@ private: MetricsUpdateHook _metricsHook; std::unique_ptr<MetricsEngine> _metricsEngine; ProtonFileHeaderContext _fileHeaderContext; - TLS::UP _tls; + std::unique_ptr<TLS> _tls; std::unique_ptr<DiskMemUsageSampler> _diskMemUsageSampler; PersistenceEngine::UP _persistenceEngine; DocumentDBMap _documentDBMap; diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index b7eb56d1fd9..a20e0cc3aaa 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -256,7 +256,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size()))); EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())), std::runtime_error, - "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); + "commit failed with code -2. server says: Exception during commit on " + name + " : Incoming serial number(1) must be bigger than the last one (3)."); EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index 4ead34552bd..5dca84a26c1 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_library(searchlib_transactionlog OBJECT chunks.cpp common.cpp domain.cpp + domainconfig.cpp domainpart.cpp ichunk.cpp nosyncproxy.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 0deceb2668a..5cb1d67d525 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -90,8 +90,17 @@ int makeDirectory(const char * dir); class Writer { public: using DoneCallback = std::shared_ptr<IDestructorCallback>; - virtual ~Writer() { } + virtual ~Writer() = default; virtual void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) = 0; }; +class Destination { +public: + virtual ~Destination() = default; + virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; + virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; + virtual bool connected() const = 0; + virtual bool ok() const = 0; +}; + } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 6b47393336a..aaf70964068 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "domain.h" +#include "domainpart.h" +#include "session.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/io/fileutil.h> @@ -15,10 +17,11 @@ LOG_SETUP(".transactionlog.domain"); using vespalib::string; -using vespalib::make_string; +using vespalib::make_string_short::fmt; using vespalib::LockGuard; using vespalib::makeTask; using vespalib::makeClosure; +using vespalib::makeLambdaTask; using vespalib::Monitor; using vespalib::MonitorGuard; using search::common::FileHeaderContext; @@ -29,14 +32,6 @@ namespace search::transactionlog { VESPA_THREAD_STACK_TAG(domain_commit_executor); -DomainConfig::DomainConfig() - : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), - _compressionLevel(9), - _partSizeLimit(0x10000000), // 256M - _chunkSizeLimit(0x40000), // 256k - _chunkAgeLimit(10ms) -{ } - Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), @@ -60,10 +55,10 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { - throw runtime_error(make_string("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno)); + throw runtime_error(fmt("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno)); } if ((retval = makeDirectory(dir().c_str())) != 0) { - throw runtime_error(make_string("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno)); + throw runtime_error(fmt("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno)); } SerialNumList partIdVector = scanDir(); const int64_t lastPart = partIdVector.empty() ? 0 : partIdVector.back(); @@ -356,7 +351,7 @@ Domain::erase(SerialNum to) int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, - std::unique_ptr<Session::Destination> dest) + std::unique_ptr<Destination> dest) { assert(this == domain.get()); cleanSessions(); @@ -442,7 +437,7 @@ Domain::scanDir() continue; const char *p = ename + wantPrefixLen + 1; uint64_t num = strtoull(p, NULL, 10); - string checkName = make_string("%s-%016" PRIu64, _name.c_str(), num); + string checkName = fmt("%s-%016" PRIu64, _name.c_str(), num); if (strcmp(checkName.c_str(), ename) != 0) continue; res.push_back(static_cast<SerialNum>(num)); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index a6c6dad5fe8..9c8b578a2c9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -1,70 +1,23 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "domainpart.h" -#include "session.h" +#include "domainconfig.h" +#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadexecutor.h> -#include <vespa/vespalib/util/time.h> -#include <vespa/fastos/thread.h> -#include <chrono> +#include <atomic> +namespace search::common { class FileHeaderContext; } namespace search::transactionlog { -class DomainConfig { -public: - using duration = vespalib::duration; - DomainConfig(); - DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } - DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } - DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } - DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } - DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } - Encoding getEncoding() const { return _encoding; } - size_t getPartSizeLimit() const { return _partSizeLimit; } - size_t getChunkSizeLimit() const { return _chunkSizeLimit; } - duration getChunkAgeLimit() const { return _chunkAgeLimit; } - uint8_t getCompressionlevel() const { return _compressionLevel; } -private: - Encoding _encoding; - uint8_t _compressionLevel; - size_t _partSizeLimit; - size_t _chunkSizeLimit; - duration _chunkAgeLimit; -}; - -struct PartInfo { - SerialNumRange range; - size_t numEntries; - size_t byteSize; - vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) - : range(range_in), - numEntries(numEntries_in), - byteSize(byteSize_in), - file(file_in) - {} -}; - -struct DomainInfo { - using DurationSeconds = std::chrono::duration<double>; - SerialNumRange range; - size_t numEntries; - size_t byteSize; - DurationSeconds maxSessionRunTime; - std::vector<PartInfo> parts; - DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in) - : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {} - DomainInfo() - : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {} -}; - -typedef std::map<vespalib::string, DomainInfo> DomainStats; +class DomainPart; +class Session; class Domain { public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::SyncableThreadExecutor; + using DomainPartSP = std::shared_ptr<DomainPart>; Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); @@ -75,12 +28,13 @@ public: bool erase(SerialNum to); void commit(const Packet & packet, Writer::DoneCallback onDone); - int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest); + int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Destination> dest); SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; void triggerSyncNow(); + bool commitIfStale(); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -91,7 +45,7 @@ public: int closeSession(int sessionId); SerialNum findOldestActiveVisit() const; - DomainPart::SP findPart(SerialNum s); + DomainPartSP findPart(SerialNum s); static vespalib::string getDir(const vespalib::string & base, const vespalib::string & domain) { @@ -115,8 +69,8 @@ private: SerialNumList scanDir(); - using SessionList = std::map<int, Session::SP>; - using DomainPartList = std::map<int64_t, DomainPart::SP>; + using SessionList = std::map<int, std::shared_ptr<Session>>; + using DomainPartList = std::map<int64_t, DomainPartSP>; using DurationSeconds = std::chrono::duration<double>; DomainConfig _config; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp new file mode 100644 index 00000000000..beac8cf714b --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp @@ -0,0 +1,16 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "domainconfig.h" + +namespace search::transactionlog { + +DomainConfig::DomainConfig() + : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), + _compressionLevel(9), + _fSyncOnCommit(false), + _partSizeLimit(0x10000000), // 256M + _chunkSizeLimit(0x40000), // 256k + _chunkAgeLimit(10ms) +{ } + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h new file mode 100644 index 00000000000..ada1e20e095 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h @@ -0,0 +1,63 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "ichunk.h" +#include <vespa/vespalib/util/time.h> +#include <map> + +namespace search::transactionlog { + +class DomainConfig { +public: + using duration = vespalib::duration; + DomainConfig(); + DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } + DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } + DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } + DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } + DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } + DomainConfig & setFSyncOnCommit(bool v) { _fSyncOnCommit = v; return *this; } + Encoding getEncoding() const { return _encoding; } + size_t getPartSizeLimit() const { return _partSizeLimit; } + size_t getChunkSizeLimit() const { return _chunkSizeLimit; } + duration getChunkAgeLimit() const { return _chunkAgeLimit; } + uint8_t getCompressionlevel() const { return _compressionLevel; } + bool getFSyncOnCommit() const { return _fSyncOnCommit; } +private: + Encoding _encoding; + uint8_t _compressionLevel; + bool _fSyncOnCommit; + size_t _partSizeLimit; + size_t _chunkSizeLimit; + duration _chunkAgeLimit; +}; + +struct PartInfo { + SerialNumRange range; + size_t numEntries; + size_t byteSize; + vespalib::string file; + PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) + : range(range_in), + numEntries(numEntries_in), + byteSize(byteSize_in), + file(file_in) + {} +}; + +struct DomainInfo { + using DurationSeconds = std::chrono::duration<double>; + SerialNumRange range; + size_t numEntries; + size_t byteSize; + DurationSeconds maxSessionRunTime; + std::vector<PartInfo> parts; + DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in) + : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {} + DomainInfo() + : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {} +}; + +using DomainStats = std::map<vespalib::string, DomainInfo>; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 96e75c35a81..8855183226d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -409,7 +409,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) _sz++; _range.to(entry.serial()); } else { - throw runtime_error(fmt("Incomming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", + throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", entry.serial(), _range.to())); } } @@ -429,7 +429,8 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) } } -void DomainPart::sync() +void +DomainPart::sync() { SerialNum syncSerial(0); { @@ -449,7 +450,7 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) { bool retval(false); LockGuard guard(_lock); - LOG(debug, "Visit r(%" PRIu64 ", %" PRIu64 "] Checking %" PRIu64 " packets", + LOG(spam, "Visit r(%" PRIu64 ", %" PRIu64 "] Checking %" PRIu64 " packets", r.from(), r.to(), uint64_t(_packets.size())); if ( ! isClosed() ) { PacketList::const_iterator start(_packets.lower_bound(r.from() + 1)); @@ -474,7 +475,7 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) ((next != end) || ((next != _packets.end()) && ((r.to() + 1) == next->first)))) { packet = start->second; - LOG(debug, "Visit whole packet[%" PRIu64 ", %" PRIu64 "]", packet.range().from(), packet.range().to()); + LOG(spam, "Visit whole packet[%" PRIu64 ", %" PRIu64 "]", packet.range().from(), packet.range().to()); if (next != _packets.end()) { r.from(next->first - 1); retval = true; @@ -484,7 +485,7 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) } else { const nbostream & tmp = start->second.getHandle(); nbostream_longlivedbuf h(tmp.data(), tmp.size()); - LOG(debug, "Visit partial[%" PRIu64 ", %" PRIu64 "] (%zd, %zd, %zd)", + LOG(spam, "Visit partial[%" PRIu64 ", %" PRIu64 "] (%zd, %zd, %zd)", start->second.range().from(), start->second.range().to(), h.rp(), h.size(), h.capacity()); Packet newPacket(h.size()); for (; (h.size() > 0) && (r.from() < r.to()); ) { @@ -546,6 +547,8 @@ DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk) if ( ! file.CheckedWrite(os.data(), os.size()) ) { throw runtime_error(handleWriteError("Failed writing the entry.", file, byteSize(), chunk.range(), os.size())); } + LOG(debug, "Wrote chunk with %zu entries and %zu bytes, range[%" PRIu64 ", %" PRIu64 "] encoding(wanted=%x, real=%x)", + chunk.getEntries().size(), os.size(), chunk.range().from(), chunk.range().to(), _encoding.getRaw(), realEncoding.getRaw()); _writtenSerial = chunk.range().to(); _byteSize.fetch_add(os.size(), std::memory_order_release); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 5256b731125..31d6938b654 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -15,12 +15,10 @@ namespace search::common { class FileHeaderContext; } namespace search::transactionlog { class DomainPart { -private: - DomainPart(const DomainPart &); - DomainPart& operator=(const DomainPart &); - public: - typedef std::shared_ptr<DomainPart> SP; + using SP = std::shared_ptr<DomainPart>; + DomainPart(const DomainPart &) = delete; + DomainPart& operator=(const DomainPart &) = delete; DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index aa2a2085aab..722aae828db 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -3,9 +3,6 @@ #pragma once #include "common.h" -#include <memory> - -namespace vespalib { class nbostream; } namespace search::transactionlog { diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index dda840808ce..c91b719be37 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "session.h" #include "domain.h" +#include "domainpart.h" #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/log/log.h> diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index 9b8d23371e8..ddbe218ed4e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -23,14 +23,6 @@ private: using time_point = std::chrono::time_point<std::chrono::steady_clock>; public: - class Destination { - public: - virtual ~Destination() = default; - virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; - virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; - virtual bool connected() const = 0; - virtual bool ok() const = 0; - }; typedef std::shared_ptr<Session> SP; Session(const Session &) = delete; Session & operator = (const Session &) = delete; diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp index 1d1edbed658..bdf12ab64e8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "trans_log_server_explorer.h" +#include "translogserver.h" +#include "domain.h" #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/util/time.h> #include <vespa/fastos/file.h> @@ -44,6 +46,7 @@ struct DomainExplorer : vespalib::StateExplorer { } // namespace search::transactionlog::<unnamed> +TransLogServerExplorer::~TransLogServerExplorer() = default; void TransLogServerExplorer::get_state(const Inserter &inserter, bool full) const { diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h index 65d3a687bc9..66fb1698104 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h +++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h @@ -2,24 +2,27 @@ #pragma once -#include "translogserver.h" #include <vespa/vespalib/net/state_explorer.h> namespace search::transactionlog { +class TransLogServer; + /** * Class used to explore the state of a transaction log server. */ class TransLogServerExplorer : public vespalib::StateExplorer { private: - TransLogServer::SP _server; + using TransLogServerSP = std::shared_ptr<TransLogServer>; + TransLogServerSP _server; public: - TransLogServerExplorer(TransLogServer::SP server) : _server(std::move(server)) {} - virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; - virtual std::vector<vespalib::string> get_children_names() const override; - virtual std::unique_ptr<StateExplorer> get_child(vespalib::stringref name) const override; + TransLogServerExplorer(TransLogServerSP server) : _server(std::move(server)) {} + ~TransLogServerExplorer() override; + void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; + std::vector<vespalib::string> get_children_names() const override; + std::unique_ptr<StateExplorer> get_child(vespalib::stringref name) const override; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index edfbf846688..93b65002492 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,9 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" +#include "domain.h" #include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/vespalib/util/time.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/rpcrequest.h> @@ -42,7 +42,6 @@ public: void PerformTask() override; }; - SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, const TransLogServer::Session::SP &session, SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), @@ -53,10 +52,8 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const { } - SyncHandler::~SyncHandler() = default; - void SyncHandler::PerformTask() { @@ -207,10 +204,16 @@ TransLogServer::run() LOG(info, "TLS Stopped"); } +vespalib::duration +TransLogServer::getChunkAgeLimit() const +{ + Guard domainGuard(_domainMutex); + return _domainConfig.getChunkAgeLimit(); +} TransLogServer & TransLogServer::setDomainConfig(const DomainConfig & cfg) { - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); _domainConfig = cfg; for(auto &domain: _domains) { domain.second->setConfig(cfg); @@ -222,7 +225,7 @@ DomainStats TransLogServer::getDomainStats() const { DomainStats retval; - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); for (const auto &elem : _domains) { retval[elem.first] = elem.second->getDomainInfo(); } @@ -233,7 +236,7 @@ std::vector<vespalib::string> TransLogServer::getDomainNames() { std::vector<vespalib::string> names; - Guard guard(_lock); + Guard guard(_domainMutex); for(const auto &domain: _domains) { names.push_back(domain.first); } @@ -243,7 +246,7 @@ TransLogServer::getDomainNames() Domain::SP TransLogServer::findDomain(stringref domainName) { - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); Domain::SP domain; DomainList::iterator found(_domains.find(domainName)); if (found != _domains.end()) { @@ -363,7 +366,7 @@ writeDomainDir(std::lock_guard<std::mutex> &guard, vespalib::File::sync(dir); } -class RPCDestination : public Session::Destination { +class RPCDestination : public Destination { public: RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection) : _supervisor(supervisor), _connection(connection), _ok(true) @@ -448,7 +451,7 @@ TransLogServer::createDomain(FRT_RPCRequest *req) try { domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, _domainConfig, _fileHeaderContext); - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); _domains[domain->name()] = domain; writeDomainDir(domainGuard, dir(), domainList(), _domains); } catch (const std::exception & e) { @@ -477,12 +480,12 @@ TransLogServer::deleteDomain(FRT_RPCRequest *req) try { if (domain) { domain->markDeleted(); - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); _domains.erase(domainName); } vespalib::rmdir(Domain::getDir(dir(), domainName), true); vespalib::File::sync(dir()); - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); writeDomainDir(domainGuard, dir(), domainList(), _domains); } catch (const std::exception & e) { msg = make_string("Failed deleting %s domain. Exception = %s", domainName, e.what()); @@ -523,7 +526,7 @@ TransLogServer::listDomains(FRT_RPCRequest *req) LOG(debug, "listDomains()"); vespalib::string domains; - Guard domainGuard(_lock); + Guard domainGuard(_domainMutex); for(DomainList::const_iterator it(_domains.begin()), mt(_domains.end()); it != mt; it++) { domains += it->second->name(); domains += "\n"; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 3f945977386..9351f3244dd 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -1,30 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "domain.h" +#include "domainconfig.h" #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/document/util/queue.h> #include <vespa/fnet/frt/invokable.h> #include <mutex> - class FRT_Supervisor; class FNET_Transport; +namespace std {class thread; } namespace search::common { class FileHeaderContext; } - namespace search::transactionlog { class TransLogServerExplorer; +class Domain; class TransLogServer : public document::Runnable, private FRT_Invokable, public Writer { public: friend class TransLogServerExplorer; - typedef std::unique_ptr<TransLogServer> UP; - typedef std::shared_ptr<TransLogServer> SP; - + using SP = std::shared_ptr<TransLogServer>; + using DomainSP = std::shared_ptr<Domain>; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, @@ -33,8 +32,10 @@ public: const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; + bool commitIfStale(); void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; TransLogServer & setDomainConfig(const DomainConfig & cfg); + vespalib::duration getChunkAgeLimit() const; class Session { @@ -71,13 +72,13 @@ private: void downSession(FRT_RPCRequest *req); std::vector<vespalib::string> getDomainNames(); - Domain::SP findDomain(vespalib::stringref name); + DomainSP findDomain(vespalib::stringref name); vespalib::string dir() const { return _baseDir + "/" + _name; } vespalib::string domainList() const { return dir() + "/" + _name + ".domains"; } static const Session::SP & getSession(FRT_RPCRequest *req); - using DomainList = std::map<vespalib::string, Domain::SP >; + using DomainList = std::map<vespalib::string, DomainSP >; vespalib::string _name; vespalib::string _baseDir; @@ -87,12 +88,15 @@ private: std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; + std::unique_ptr<std::thread> _staleCommitThread; DomainList _domains; - mutable std::mutex _lock; // Protects _domains + mutable std::mutex _domainMutex; // Protects _domains + std::condition_variable _domainCondition; std::mutex _fileLock; // Protects the creating and deleting domains including file system operations. document::Queue<FRT_RPCRequest *> _reqQ; const common::FileHeaderContext &_fileHeaderContext; using Guard = std::lock_guard<std::mutex>; + using MonitorGuard = std::unique_lock<std::mutex>; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index d83623661ff..12c38ab5739 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -66,7 +66,8 @@ getDomainConfig(const searchlib::TranslogserverConfig & cfg) { .setCompressionLevel(cfg.compression.level) .setPartSizeLimit(cfg.filesizemax) .setChunkSizeLimit(cfg.chunk.sizelimit) - .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)); + .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)) + .setFSyncOnCommit(cfg.usefsync); return dcfg; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index d46805c105c..fb93559c29f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -23,11 +23,9 @@ private: void configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) override ; public: - typedef std::unique_ptr<TransLogServerApp> UP; - TransLogServerApp(const config::ConfigUri & tlsConfigUri, const common::FileHeaderContext &fileHeaderContext); - ~TransLogServerApp(); + ~TransLogServerApp() override; TransLogServer::SP getTransLogServer() const; |