From c3a5d994666e59627e8fb188bb0291e935cbe2da Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 7 Sep 2020 18:51:05 +0200 Subject: Revert "Balder/group commits to tls 2 remaining rebased" --- .../tests/transactionlog/translogclient_test.cpp | 2 +- .../src/vespa/searchlib/config/translogserver.def | 4 +- .../vespa/searchlib/transactionlog/CMakeLists.txt | 1 - .../src/vespa/searchlib/transactionlog/common.h | 11 +- .../src/vespa/searchlib/transactionlog/domain.cpp | 117 +++------------------ .../src/vespa/searchlib/transactionlog/domain.h | 90 ++++++++++------ .../searchlib/transactionlog/domainconfig.cpp | 16 --- .../vespa/searchlib/transactionlog/domainconfig.h | 63 ----------- .../vespa/searchlib/transactionlog/domainpart.cpp | 5 +- .../vespa/searchlib/transactionlog/domainpart.h | 8 +- .../src/vespa/searchlib/transactionlog/ichunk.h | 3 + .../src/vespa/searchlib/transactionlog/session.cpp | 1 - .../src/vespa/searchlib/transactionlog/session.h | 8 ++ .../transactionlog/trans_log_server_explorer.cpp | 3 - .../transactionlog/trans_log_server_explorer.h | 15 ++- .../searchlib/transactionlog/translogserver.cpp | 55 +++------- .../searchlib/transactionlog/translogserver.h | 21 ++-- .../searchlib/transactionlog/translogserverapp.cpp | 3 +- .../searchlib/transactionlog/translogserverapp.h | 4 +- 19 files changed, 129 insertions(+), 301 deletions(-) delete mode 100644 searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp delete mode 100644 searchlib/src/vespa/searchlib/transactionlog/domainconfig.h (limited to 'searchlib') diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index a20e0cc3aaa..b7eb56d1fd9 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -256,7 +256,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size()))); EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())), std::runtime_error, - "commit failed with code -2. server says: Exception during commit on " + name + " : Incoming serial number(1) must be bigger than the last one (3)."); + "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index e2259b8b653..f822fc80fc1 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -24,12 +24,12 @@ maxthreads int default=4 restart crcmethod enum {ccitt_crc32, xxh64} default=xxh64 ## Control compression type. -compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=NONE +compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4 ## Control compression level ## LZ4 has normal range 1..9 while ZSTD has range 1..19 ## 9 is a reasonable default for both -compression.level int default=3 +compression.level int default=9 ## How large a chunk can grow in memory before beeing flushed chunk.sizelimit int default = 256000 # 256k diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index 5dca84a26c1..4ead34552bd 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -4,7 +4,6 @@ vespa_add_library(searchlib_transactionlog OBJECT chunks.cpp common.cpp domain.cpp - domainconfig.cpp domainpart.cpp ichunk.cpp nosyncproxy.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 5cb1d67d525..0deceb2668a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -90,17 +90,8 @@ int makeDirectory(const char * dir); class Writer { public: using DoneCallback = std::shared_ptr; - virtual ~Writer() = default; + virtual ~Writer() { } virtual void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) = 0; }; -class Destination { -public: - virtual ~Destination() = default; - virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; - virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; - virtual bool connected() const = 0; - virtual bool ok() const = 0; -}; - } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 2fa1fbdeceb..6b47393336a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "domain.h" -#include "domainpart.h" -#include "session.h" #include #include #include @@ -17,11 +15,10 @@ LOG_SETUP(".transactionlog.domain"); using vespalib::string; -using vespalib::make_string_short::fmt; +using vespalib::make_string; using vespalib::LockGuard; using vespalib::makeTask; using vespalib::makeClosure; -using vespalib::makeLambdaTask; using vespalib::Monitor; using vespalib::MonitorGuard; using search::common::FileHeaderContext; @@ -32,10 +29,17 @@ namespace search::transactionlog { VESPA_THREAD_STACK_TAG(domain_commit_executor); +DomainConfig::DomainConfig() + : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), + _compressionLevel(9), + _partSizeLimit(0x10000000), // 256M + _chunkSizeLimit(0x40000), // 256k + _chunkAgeLimit(10ms) +{ } + Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), - _currentChunk(std::make_unique()), _lastSerial(0), _singleCommiter(std::make_unique(1, 128*1024)), _commitExecutor(commitExecutor), @@ -56,10 +60,10 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { - throw runtime_error(fmt("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno)); + throw runtime_error(make_string("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno)); } if ((retval = makeDirectory(dir().c_str())) != 0) { - throw runtime_error(fmt("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno)); + throw runtime_error(make_string("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno)); } SerialNumList partIdVector = scanDir(); const int64_t lastPart = partIdVector.empty() ? 0 : partIdVector.back(); @@ -123,12 +127,7 @@ private: bool & _pendingSync; }; -Domain::~Domain() { - MonitorGuard guard(_currentChunkMonitor); - guard.broadcast(); - commitChunk(grabCurrentChunk(guard), guard); - _singleCommiter->shutdown().sync(); -} +Domain::~Domain() { } DomainInfo Domain::getDomainInfo() const @@ -309,89 +308,10 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } -Domain::Chunk::Chunk() - : _data(size_t(-1)), - _callBacks(), - _firstArrivalTime() -{} - -Domain::Chunk::~Chunk() = default; - -void -Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) { - if (_callBacks.empty()) { - _firstArrivalTime = vespalib::steady_clock::now(); - } - _data.merge(packet); - _callBacks.emplace_back(std::move(onDone)); -} - -vespalib::duration -Domain::Chunk::age() const { - if (_callBacks.empty()) { - return 0ms; - } - return (vespalib::steady_clock::now() - _firstArrivalTime); -} - -void -Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); - if (_lastSerial >= packet.range().from()) { - throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", - packet.range().from(), _lastSerial)); - } else { - _lastSerial = packet.range().to(); - } - _currentChunk->add(packet, std::move(onDone)); - commitIfFull(guard); -} - -void -Domain::commitIfFull(const vespalib::MonitorGuard &guard) { - if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { - auto completed = grabCurrentChunk(guard); - if (completed) { - commitChunk(std::move(completed), guard); - } - } -} - -std::unique_ptr -Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { - assert(guard.monitors(_currentChunkMonitor)); - auto chunk = std::move(_currentChunk); - _currentChunk = std::make_unique(); - return chunk; -} - -void -Domain::commitIfStale() { - vespalib::MonitorGuard guard(_currentChunkMonitor); - commitIfStale(guard); -} - -void -Domain::commitIfStale(const vespalib::MonitorGuard & guard) { - assert(guard.monitors(_currentChunkMonitor)); - if ((_currentChunk->age() > _config.getChunkAgeLimit()) && ! _currentChunk->getPacket().empty()) { - commitChunk(grabCurrentChunk(guard), guard); - } -} - void -Domain::commitChunk(std::unique_ptr chunk, const vespalib::MonitorGuard & chunkOrderGuard) { - assert(chunkOrderGuard.monitors(_currentChunkMonitor)); - if ( ! chunk->getPacket().empty()) { - _singleCommiter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { - doCommit(std::move(chunk)); - })); - } -} - -void -Domain::doCommit(std::unique_ptr chunk) { - const Packet & packet = chunk->getPacket(); +Domain::commit(const Packet & packet, Writer::DoneCallback onDone) +{ + (void) onDone; DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; @@ -411,9 +331,6 @@ Domain::doCommit(std::unique_ptr chunk) { vespalib::File::sync(dir()); } dp->commit(entry.serial(), packet); - if (_config.getFSyncOnCommit()) { - dp->sync(); - } cleanSessions(); } @@ -439,7 +356,7 @@ Domain::erase(SerialNum to) int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, - std::unique_ptr dest) + std::unique_ptr dest) { assert(this == domain.get()); cleanSessions(); @@ -525,7 +442,7 @@ Domain::scanDir() continue; const char *p = ename + wantPrefixLen + 1; uint64_t num = strtoull(p, NULL, 10); - string checkName = fmt("%s-%016" PRIu64, _name.c_str(), num); + string checkName = make_string("%s-%016" PRIu64, _name.c_str(), num); if (strcmp(checkName.c_str(), ename) != 0) continue; res.push_back(static_cast(num)); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 3f9afb7dde5..a6c6dad5fe8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -1,23 +1,70 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "domainconfig.h" -#include +#include "domainpart.h" +#include "session.h" #include -#include +#include +#include +#include -namespace search::common { class FileHeaderContext; } namespace search::transactionlog { -class DomainPart; -class Session; +class DomainConfig { +public: + using duration = vespalib::duration; + DomainConfig(); + DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } + DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } + DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } + DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } + DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } + Encoding getEncoding() const { return _encoding; } + size_t getPartSizeLimit() const { return _partSizeLimit; } + size_t getChunkSizeLimit() const { return _chunkSizeLimit; } + duration getChunkAgeLimit() const { return _chunkAgeLimit; } + uint8_t getCompressionlevel() const { return _compressionLevel; } +private: + Encoding _encoding; + uint8_t _compressionLevel; + size_t _partSizeLimit; + size_t _chunkSizeLimit; + duration _chunkAgeLimit; +}; + +struct PartInfo { + SerialNumRange range; + size_t numEntries; + size_t byteSize; + vespalib::string file; + PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) + : range(range_in), + numEntries(numEntries_in), + byteSize(byteSize_in), + file(file_in) + {} +}; + +struct DomainInfo { + using DurationSeconds = std::chrono::duration; + SerialNumRange range; + size_t numEntries; + size_t byteSize; + DurationSeconds maxSessionRunTime; + std::vector parts; + DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in) + : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {} + DomainInfo() + : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {} +}; + +typedef std::map DomainStats; class Domain { public: using SP = std::shared_ptr; using Executor = vespalib::SyncableThreadExecutor; - using DomainPartSP = std::shared_ptr; Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); @@ -28,13 +75,12 @@ public: bool erase(SerialNum to); void commit(const Packet & packet, Writer::DoneCallback onDone); - int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr dest); + int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr dest); SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; void triggerSyncNow(); - void commitIfStale(); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -45,7 +91,7 @@ public: int closeSession(int sessionId); SerialNum findOldestActiveVisit() const; - DomainPartSP findPart(SerialNum s); + DomainPart::SP findPart(SerialNum s); static vespalib::string getDir(const vespalib::string & base, const vespalib::string & domain) { @@ -57,25 +103,6 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: - void commitIfStale(const vespalib::MonitorGuard & guard); - void commitIfFull(const vespalib::MonitorGuard & guard); - class Chunk { - public: - Chunk(); - ~Chunk(); - void add(const Packet & packet, Writer::DoneCallback onDone); - size_t sizeBytes() const { return _data.sizeBytes(); } - const Packet & getPacket() const { return _data; } - vespalib::duration age() const; - private: - Packet _data; - std::vector _callBacks; - vespalib::steady_time _firstArrivalTime; - }; - - std::unique_ptr grabCurrentChunk(const vespalib::MonitorGuard & guard); - void commitChunk(std::unique_ptr chunk, const vespalib::MonitorGuard & chunkOrderGuard); - void doCommit(std::unique_ptr chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; @@ -88,12 +115,11 @@ private: SerialNumList scanDir(); - using SessionList = std::map>; - using DomainPartList = std::map; + using SessionList = std::map; + using DomainPartList = std::map; using DurationSeconds = std::chrono::duration; DomainConfig _config; - std::unique_ptr _currentChunk; SerialNum _lastSerial; std::unique_ptr _singleCommiter; Executor & _commitExecutor; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp deleted file mode 100644 index beac8cf714b..00000000000 --- a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.cpp +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "domainconfig.h" - -namespace search::transactionlog { - -DomainConfig::DomainConfig() - : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), - _compressionLevel(9), - _fSyncOnCommit(false), - _partSizeLimit(0x10000000), // 256M - _chunkSizeLimit(0x40000), // 256k - _chunkAgeLimit(10ms) -{ } - -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h b/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h deleted file mode 100644 index ada1e20e095..00000000000 --- a/searchlib/src/vespa/searchlib/transactionlog/domainconfig.h +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "ichunk.h" -#include -#include - -namespace search::transactionlog { - -class DomainConfig { -public: - using duration = vespalib::duration; - DomainConfig(); - DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } - DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } - DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } - DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } - DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } - DomainConfig & setFSyncOnCommit(bool v) { _fSyncOnCommit = v; return *this; } - Encoding getEncoding() const { return _encoding; } - size_t getPartSizeLimit() const { return _partSizeLimit; } - size_t getChunkSizeLimit() const { return _chunkSizeLimit; } - duration getChunkAgeLimit() const { return _chunkAgeLimit; } - uint8_t getCompressionlevel() const { return _compressionLevel; } - bool getFSyncOnCommit() const { return _fSyncOnCommit; } -private: - Encoding _encoding; - uint8_t _compressionLevel; - bool _fSyncOnCommit; - size_t _partSizeLimit; - size_t _chunkSizeLimit; - duration _chunkAgeLimit; -}; - -struct PartInfo { - SerialNumRange range; - size_t numEntries; - size_t byteSize; - vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) - : range(range_in), - numEntries(numEntries_in), - byteSize(byteSize_in), - file(file_in) - {} -}; - -struct DomainInfo { - using DurationSeconds = std::chrono::duration; - SerialNumRange range; - size_t numEntries; - size_t byteSize; - DurationSeconds maxSessionRunTime; - std::vector parts; - DomainInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, DurationSeconds maxSessionRunTime_in) - : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), maxSessionRunTime(maxSessionRunTime_in), parts() {} - DomainInfo() - : range(), numEntries(0), byteSize(0), maxSessionRunTime(), parts() {} -}; - -using DomainStats = std::map; - -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 40accb0057d..96e75c35a81 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate) DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding), + : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression _compressionLevel(compressionLevel), _lock(), _fileLock(), @@ -429,8 +429,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) } } -void -DomainPart::sync() +void DomainPart::sync() { SerialNum syncSerial(0); { diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 31d6938b654..5256b731125 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -15,10 +15,12 @@ namespace search::common { class FileHeaderContext; } namespace search::transactionlog { class DomainPart { +private: + DomainPart(const DomainPart &); + DomainPart& operator=(const DomainPart &); + public: - using SP = std::shared_ptr; - DomainPart(const DomainPart &) = delete; - DomainPart& operator=(const DomainPart &) = delete; + typedef std::shared_ptr SP; DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index 854227cfc9c..5e44815cb1b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -3,6 +3,9 @@ #pragma once #include "common.h" +#include + +namespace vespalib { class nbostream; } namespace search::transactionlog { diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index c91b719be37..dda840808ce 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "session.h" #include "domain.h" -#include "domainpart.h" #include #include diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index ddbe218ed4e..9b8d23371e8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -23,6 +23,14 @@ private: using time_point = std::chrono::time_point; public: + class Destination { + public: + virtual ~Destination() = default; + virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; + virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; + virtual bool connected() const = 0; + virtual bool ok() const = 0; + }; typedef std::shared_ptr SP; Session(const Session &) = delete; Session & operator = (const Session &) = delete; diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp index bdf12ab64e8..1d1edbed658 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.cpp @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "trans_log_server_explorer.h" -#include "translogserver.h" -#include "domain.h" #include #include #include @@ -46,7 +44,6 @@ struct DomainExplorer : vespalib::StateExplorer { } // namespace search::transactionlog:: -TransLogServerExplorer::~TransLogServerExplorer() = default; void TransLogServerExplorer::get_state(const Inserter &inserter, bool full) const { diff --git a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h index 66fb1698104..65d3a687bc9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h +++ b/searchlib/src/vespa/searchlib/transactionlog/trans_log_server_explorer.h @@ -2,27 +2,24 @@ #pragma once +#include "translogserver.h" #include namespace search::transactionlog { -class TransLogServer; - /** * Class used to explore the state of a transaction log server. */ class TransLogServerExplorer : public vespalib::StateExplorer { private: - using TransLogServerSP = std::shared_ptr; - TransLogServerSP _server; + TransLogServer::SP _server; public: - TransLogServerExplorer(TransLogServerSP server) : _server(std::move(server)) {} - ~TransLogServerExplorer() override; - void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; - std::vector get_children_names() const override; - std::unique_ptr get_child(vespalib::stringref name) const override; + TransLogServerExplorer(TransLogServer::SP server) : _server(std::move(server)) {} + virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; + virtual std::vector get_children_names() const override; + virtual std::unique_ptr get_child(vespalib::stringref name) const override; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ec70f32694a..edfbf846688 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,9 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" -#include "domain.h" #include #include #include +#include #include #include #include @@ -42,6 +42,7 @@ public: void PerformTask() override; }; + SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, const TransLogServer::Session::SP &session, SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), @@ -52,8 +53,10 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const { } + SyncHandler::~SyncHandler() = default; + void SyncHandler::PerformTask() { @@ -72,26 +75,6 @@ SyncHandler::PerformTask() } } -class StaleCommitTask final : public FNET_Task { -public: - StaleCommitTask(FNET_Scheduler * sceduler, TransLogServer & server, double seconds2Wait) - : FNET_Task(sceduler), - _server(server), - _seconds2Wait(seconds2Wait) - {} - ~StaleCommitTask() override = default; - - void PerformTask() override { - if (_server.running()) { - _server.commitIfStale(); - Schedule(_seconds2Wait); - } - } -private: - TransLogServer & _server; - double _seconds2Wait; -}; - } TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, @@ -161,14 +144,10 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } start(*_threadPool); - double chunkAgeLimit = vespalib::to_s(_domainConfig.getChunkAgeLimit()); - _staleCommitTask = std::make_unique(_supervisor->GetScheduler(), *this, chunkAgeLimit); - _staleCommitTask->ScheduleNow(); } TransLogServer::~TransLogServer() { - _staleCommitTask->Kill(); stop(); join(); _commitExecutor.shutdown(); @@ -231,7 +210,7 @@ TransLogServer::run() TransLogServer & TransLogServer::setDomainConfig(const DomainConfig & cfg) { - Guard domainGuard(_domainMutex); + Guard domainGuard(_lock); _domainConfig = cfg; for(auto &domain: _domains) { domain.second->setConfig(cfg); @@ -239,19 +218,11 @@ TransLogServer::setDomainConfig(const DomainConfig & cfg) { return *this; } -void -TransLogServer::commitIfStale() { - MonitorGuard domainMonitor(_domainMutex); - for (const auto &domain : _domains) { - domain.second->commitIfStale(); - } -} - DomainStats TransLogServer::getDomainStats() const { DomainStats retval; - Guard domainGuard(_domainMutex); + Guard domainGuard(_lock); for (const auto &elem : _domains) { retval[elem.first] = elem.second->getDomainInfo(); } @@ -262,7 +233,7 @@ std::vector TransLogServer::getDomainNames() { std::vector names; - Guard guard(_domainMutex); + Guard guard(_lock); for(const auto &domain: _domains) { names.push_back(domain.first); } @@ -272,7 +243,7 @@ TransLogServer::getDomainNames() Domain::SP TransLogServer::findDomain(stringref domainName) { - Guard domainGuard(_domainMutex); + Guard domainGuard(_lock); Domain::SP domain; DomainList::iterator found(_domains.find(domainName)); if (found != _domains.end()) { @@ -392,7 +363,7 @@ writeDomainDir(std::lock_guard &guard, vespalib::File::sync(dir); } -class RPCDestination : public Destination { +class RPCDestination : public Session::Destination { public: RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection) : _supervisor(supervisor), _connection(connection), _ok(true) @@ -477,7 +448,7 @@ TransLogServer::createDomain(FRT_RPCRequest *req) try { domain = std::make_shared(domainName, dir(), _commitExecutor, _sessionExecutor, _domainConfig, _fileHeaderContext); - Guard domainGuard(_domainMutex); + Guard domainGuard(_lock); _domains[domain->name()] = domain; writeDomainDir(domainGuard, dir(), domainList(), _domains); } catch (const std::exception & e) { @@ -506,12 +477,12 @@ TransLogServer::deleteDomain(FRT_RPCRequest *req) try { if (domain) { domain->markDeleted(); - Guard domainGuard(_domainMutex); + Guard domainGuard(_lock); _domains.erase(domainName); } vespalib::rmdir(Domain::getDir(dir(), domainName), true); vespalib::File::sync(dir()); - Guard domainGuard(_domainMutex); + Guard domainGuard(_lock); writeDomainDir(domainGuard, dir(), domainList(), _domains); } catch (const std::exception & e) { msg = make_string("Failed deleting %s domain. Exception = %s", domainName, e.what()); @@ -552,7 +523,7 @@ TransLogServer::listDomains(FRT_RPCRequest *req) LOG(debug, "listDomains()"); vespalib::string domains; - Guard domainGuard(_domainMutex); + Guard domainGuard(_lock); for(DomainList::const_iterator it(_domains.begin()), mt(_domains.end()); it != mt; it++) { domains += it->second->name(); domains += "\n"; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 8e0e8041385..3f945977386 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -1,29 +1,30 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "domainconfig.h" +#include "domain.h" #include #include #include #include #include + class FRT_Supervisor; class FNET_Transport; -class FNET_Task; namespace search::common { class FileHeaderContext; } + namespace search::transactionlog { class TransLogServerExplorer; -class Domain; class TransLogServer : public document::Runnable, private FRT_Invokable, public Writer { public: friend class TransLogServerExplorer; - using SP = std::shared_ptr; - using DomainSP = std::shared_ptr; + typedef std::unique_ptr UP; + typedef std::shared_ptr SP; + TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, @@ -32,7 +33,6 @@ public: const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; - void commitIfStale(); void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; TransLogServer & setDomainConfig(const DomainConfig & cfg); @@ -71,13 +71,13 @@ private: void downSession(FRT_RPCRequest *req); std::vector getDomainNames(); - DomainSP findDomain(vespalib::stringref name); + Domain::SP findDomain(vespalib::stringref name); vespalib::string dir() const { return _baseDir + "/" + _name; } vespalib::string domainList() const { return dir() + "/" + _name + ".domains"; } static const Session::SP & getSession(FRT_RPCRequest *req); - using DomainList = std::map; + using DomainList = std::map; vespalib::string _name; vespalib::string _baseDir; @@ -87,15 +87,12 @@ private: std::unique_ptr _threadPool; std::unique_ptr _transport; std::unique_ptr _supervisor; - std::unique_ptr _staleCommitTask; DomainList _domains; - mutable std::mutex _domainMutex; // Protects _domains - std::condition_variable _domainCondition; + mutable std::mutex _lock; // Protects _domains std::mutex _fileLock; // Protects the creating and deleting domains including file system operations. document::Queue _reqQ; const common::FileHeaderContext &_fileHeaderContext; using Guard = std::lock_guard; - using MonitorGuard = std::unique_lock; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index 12c38ab5739..d83623661ff 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -66,8 +66,7 @@ getDomainConfig(const searchlib::TranslogserverConfig & cfg) { .setCompressionLevel(cfg.compression.level) .setPartSizeLimit(cfg.filesizemax) .setChunkSizeLimit(cfg.chunk.sizelimit) - .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)) - .setFSyncOnCommit(cfg.usefsync); + .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)); return dcfg; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index fb93559c29f..d46805c105c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -23,9 +23,11 @@ private: void configure(std::unique_ptr cfg) override ; public: + typedef std::unique_ptr UP; + TransLogServerApp(const config::ConfigUri & tlsConfigUri, const common::FileHeaderContext &fileHeaderContext); - ~TransLogServerApp() override; + ~TransLogServerApp(); TransLogServer::SP getTransLogServer() const; -- cgit v1.2.3