From 31b179d62e3dec85d9af5058857ed16fa81a4002 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 21 Sep 2020 18:23:22 +0000 Subject: Add CommitResult. --- .../src/vespa/searchlib/transactionlog/common.cpp | 26 ++++++++++++++++++--- .../src/vespa/searchlib/transactionlog/common.h | 27 +++++++++++++++++++--- 2 files changed, 47 insertions(+), 6 deletions(-) (limited to 'searchlib/src') diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index 556ebca06ec..3308f3182dc 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -121,11 +121,26 @@ Packet::add(const Packet::Entry & e) _range.to(e.serial()); } +Writer::CommitResult::CommitResult() + : _callBacks() +{} +Writer::CommitResult::CommitResult( CommitPayload commitPayLoad) + : _callBacks(std::move(commitPayLoad)) +{} + +Writer::CommitResult::~CommitResult() = default; + CommitChunk::CommitChunk(size_t reserveBytes, size_t reserveCount) : _data(reserveBytes), - _callBacks() + _callBacks(std::make_shared()) +{ + _callBacks->reserve(reserveCount); +} + +CommitChunk::CommitChunk(size_t reserveBytes, Writer::CommitPayload postponed) + : _data(reserveBytes), + _callBacks(std::move(postponed)) { - _callBacks.reserve(reserveCount); } CommitChunk::~CommitChunk() = default; @@ -133,7 +148,12 @@ CommitChunk::~CommitChunk() = default; void CommitChunk::add(const Packet &packet, Writer::DoneCallback onDone) { _data.merge(packet); - _callBacks.emplace_back(std::move(onDone)); + _callBacks->emplace_back(std::move(onDone)); +} + +Writer::CommitResult +CommitChunk::createCommitResult() const { + return Writer::CommitResult(_callBacks); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 7cdfad44b87..c8f7a81ac7d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -84,6 +84,21 @@ int makeDirectory(const char * dir); class Writer { public: using DoneCallback = std::shared_ptr; + using DoneCallbacksList = std::vector; + using CommitPayload = std::shared_ptr; + class CommitResult { + public: + CommitResult(); + CommitResult(CommitPayload callBacks); + CommitResult(CommitResult &&) noexcept = default; + CommitResult & operator = (CommitResult &&) noexcept = default; + CommitResult(const CommitResult &) = delete; + CommitResult & operator = (const CommitResult &) = delete; + ~CommitResult(); + size_t getNumOperations() const { return _callBacks->size(); } + private: + CommitPayload _callBacks; + }; virtual ~Writer() = default; virtual void commit(const Packet & packet, DoneCallback done) = 0; }; @@ -106,14 +121,20 @@ public: class CommitChunk { public: CommitChunk(size_t reserveBytes, size_t reserveCount); + CommitChunk(size_t reserveBytes, Writer::CommitPayload postponed); ~CommitChunk(); + bool empty() const { return _callBacks->empty(); } void add(const Packet & packet, Writer::DoneCallback onDone); size_t sizeBytes() const { return _data.sizeBytes(); } const Packet & getPacket() const { return _data; } - size_t getNumCallBacks() const { return _callBacks.size(); } + size_t getNumCallBacks() const { return _callBacks->size(); } + Writer::CommitResult createCommitResult() const; + void setCommitDoneCallback(Writer::DoneCallback onDone) { _onCommitDone = std::move(onDone); } + Writer::CommitPayload stealCallbacks() { return std::move(_callBacks); } private: - Packet _data; - std::vector _callBacks; + Packet _data; + Writer::CommitPayload _callBacks; + Writer::DoneCallback _onCommitDone; }; } -- cgit v1.2.3 From 6eab511ac74f9c60427d3510a55003e4c7d380ac Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 21 Sep 2020 19:07:55 +0000 Subject: Add startCommit method --- .../documentdb/feedhandler/feedhandler_test.cpp | 1 + .../lid_space_compaction_test.cpp | 3 ++ .../maintenancecontroller_test.cpp | 3 ++ .../vespa/searchcore/proton/server/feedhandler.cpp | 42 ++++++++++++++++++---- .../vespa/searchcore/proton/server/feedhandler.h | 1 + .../searchcore/proton/server/i_operation_storer.h | 6 ++++ .../tests/transactionlog/translogclient_test.cpp | 5 +-- .../src/vespa/searchlib/transactionlog/common.h | 3 +- .../src/vespa/searchlib/transactionlog/domain.cpp | 8 ++++- .../src/vespa/searchlib/transactionlog/domain.h | 3 +- .../searchlib/transactionlog/translogserver.cpp | 3 +- 11 files changed, 65 insertions(+), 13 deletions(-) (limited to 'searchlib/src') diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index ad0ce0b26c4..72592cca681 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -434,6 +434,7 @@ struct MyTlsWriter : TlsWriter { MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {} void appendOperation(const FeedOperation &, DoneCallback) override { ++store_count; } + CommitResult startCommit(DoneCallback) override { return CommitResult(); } bool erase(SerialNum) override { ++erase_count; return erase_return; } SerialNum sync(SerialNum syncTo) override { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index 31882061b1c..04444647b5d 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -172,6 +172,9 @@ struct MyStorer : public IOperationStorer { ++_compactCnt; } } + CommitResult startCommit(DoneCallback) override { + return CommitResult(); + } }; struct MyFrozenBucketHandler : public IFrozenBucketHandler { diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 8df62705cb3..f033dfd50a8 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -235,6 +235,9 @@ public: // Implements IOperationStorer void appendOperation(const FeedOperation &op, DoneCallback) override; + CommitResult startCommit(DoneCallback) override { + return CommitResult(); + } uint32_t getHeartBeats() const { return _heartBeats; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 734ef01d33a..c45d216a631 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -58,17 +58,20 @@ class TlsMgrWriter : public TlsWriter { std::shared_ptr _writer; public: TlsMgrWriter(TransactionLogManager &tls_mgr, - const search::transactionlog::WriterFactory & factory) : - _tls_mgr(tls_mgr), - _writer(factory.getWriter(tls_mgr.getDomainName())) + const search::transactionlog::WriterFactory & factory) + : _tls_mgr(tls_mgr), + _writer(factory.getWriter(tls_mgr.getDomainName())) { } void appendOperation(const FeedOperation &op, DoneCallback onDone) override; + CommitResult startCommit(DoneCallback onDone) override { + return _writer->startCommit(std::move(onDone)); + } bool erase(SerialNum oldest_to_keep) override; SerialNum sync(SerialNum syncTo) override; }; - -void TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) { +void +TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) { using Packet = search::transactionlog::Packet; vespalib::nbostream stream; op.serialize(stream); @@ -77,9 +80,11 @@ void TlsMgrWriter::appendOperation(const FeedOperation &op, DoneCallback onDone) Packet::Entry entry(op.getSerialNum(), op.getType(), vespalib::ConstBufferRef(stream.data(), stream.size())); Packet packet(entry.serializedSize()); packet.add(entry); - _writer->commit(packet, std::move(onDone)); + _writer->append(packet, std::move(onDone)); } -bool TlsMgrWriter::erase(SerialNum oldest_to_keep) { + +bool +TlsMgrWriter::erase(SerialNum oldest_to_keep) { return _tls_mgr.getSession()->erase(oldest_to_keep); } @@ -104,6 +109,24 @@ TlsMgrWriter::sync(SerialNum syncTo) throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); } +class OnCommitDone : public search::IDestructorCallback { +public: + OnCommitDone(Executor & executor, std::unique_ptr task) + : _executor(executor), + _task(std::move(task)) + {} + ~OnCommitDone() override { _executor.execute(std::move(_task)); } +private: + Executor & _executor; + std::unique_ptr _task; +}; + +template +struct KeepAlive : public search::IDestructorCallback { + explicit KeepAlive(T toKeep) : _toKeep(std::move(toKeep)) { } + ~KeepAlive() override = default; + T _toKeep; +}; } // namespace void @@ -479,6 +502,11 @@ FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback on _tlsWriter->appendOperation(op, std::move(onDone)); } +FeedHandler::CommitResult +FeedHandler::startCommit(DoneCallback onDone) { + return _tlsWriter->startCommit(std::move(onDone)); +} + void FeedHandler::storeOperationSync(const FeedOperation &op) { vespalib::Gate gate; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 29961f4a6cc..6a5fa318959 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -226,6 +226,7 @@ public: void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); void appendOperation(const FeedOperation &op, DoneCallback onDone) override; + CommitResult startCommit(DoneCallback onDone) override; void storeOperationSync(const FeedOperation & op); void considerDelayedPrune(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index 47b81a9a17f..481a70564cc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -14,12 +14,18 @@ class FeedOperation; struct IOperationStorer { using DoneCallback = search::transactionlog::Writer::DoneCallback; + using CommitResult = search::transactionlog::Writer::CommitResult; virtual ~IOperationStorer() = default; /** * Assign serial number to (if not set) and store the given operation. */ virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0; + void storeOperation(const FeedOperation &op, DoneCallback onDone) { + appendOperation(op, onDone); + startCommit(std::move(onDone)); + } + virtual CommitResult startCommit(DoneCallback onDone) = 0; }; } // namespace proton diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 9e5021b4778..478bc594368 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -329,11 +329,12 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); p->add(e); if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { - domainWriter->commit(*p, std::make_shared(inFlight)); + domainWriter->append(*p, std::make_shared(inFlight)); p = std::make_unique(DEFAULT_PACKET_SIZE); } } - domainWriter->commit(*p, std::make_shared(inFlight)); + domainWriter->append(*p, std::make_shared(inFlight)); + domainWriter->startCommit(Writer::DoneCallback()); LOG(info, "Inflight %ld", inFlight.load()); } while (inFlight.load() != 0) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index c8f7a81ac7d..43b20b57045 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -100,7 +100,8 @@ public: CommitPayload _callBacks; }; virtual ~Writer() = default; - virtual void commit(const Packet & packet, DoneCallback done) = 0; + virtual void append(const Packet & packet, DoneCallback done) = 0; + virtual CommitResult startCommit(DoneCallback onDone) = 0; }; class WriterFactory { diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 415ccafda70..7a7be3ab86b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -310,8 +310,14 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } +Domain::CommitResult +Domain::startCommit(DoneCallback onDone) { + (void) onDone; + return CommitResult(); +} + void -Domain::commit(const Packet & packet, Writer::DoneCallback onDone) +Domain::append(const Packet & packet, Writer::DoneCallback onDone) { (void) onDone; vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 9adff564cc8..86e3681f0a1 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -27,7 +27,8 @@ public: const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet, Writer::DoneCallback onDone) override; + void append(const Packet & packet, Writer::DoneCallback onDone) override; + CommitResult startCommit(DoneCallback onDone) override; int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr dest); SerialNum begin() const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 269ed7e9380..9b7d328d9cb 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -572,7 +572,8 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) Packet packet(params[1]._data._buf, params[1]._data._len); try { vespalib::Gate gate; - domain->commit(packet, make_shared(gate)); + domain->append(packet, make_shared(gate)); + domain->startCommit(make_shared()); gate.await(); ret.AddInt32(0); ret.AddString("ok"); -- cgit v1.2.3 From 20d0f96d2e6f276669293780adc67421aea0dee2 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 22 Sep 2020 10:05:23 +0000 Subject: Add [[nodiscard]] attribute to startCommit. --- .../vespa/searchcore/proton/server/feedhandler.cpp | 2 +- .../vespa/searchcore/proton/server/feedhandler.h | 2 +- .../searchcore/proton/server/i_operation_storer.h | 6 +--- .../tests/transactionlog/translogclient_test.cpp | 2 +- .../src/vespa/searchlib/common/fileheadercontext.h | 34 +++++------------- .../src/vespa/searchlib/transactionlog/common.h | 6 ++-- .../src/vespa/searchlib/transactionlog/domain.cpp | 3 +- .../src/vespa/searchlib/transactionlog/domain.h | 40 +++++++++++----------- .../searchlib/transactionlog/translogserver.cpp | 2 +- 9 files changed, 37 insertions(+), 60 deletions(-) (limited to 'searchlib/src') diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index c45d216a631..37dfddf0c2c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -63,7 +63,7 @@ public: _writer(factory.getWriter(tls_mgr.getDomainName())) { } void appendOperation(const FeedOperation &op, DoneCallback onDone) override; - CommitResult startCommit(DoneCallback onDone) override { + [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override { return _writer->startCommit(std::move(onDone)); } bool erase(SerialNum oldest_to_keep) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 6a5fa318959..4807c596130 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -226,7 +226,7 @@ public: void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); void appendOperation(const FeedOperation &op, DoneCallback onDone) override; - CommitResult startCommit(DoneCallback onDone) override; + [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override; void storeOperationSync(const FeedOperation & op); void considerDelayedPrune(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index 481a70564cc..b276779c2ee 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -21,11 +21,7 @@ struct IOperationStorer * Assign serial number to (if not set) and store the given operation. */ virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0; - void storeOperation(const FeedOperation &op, DoneCallback onDone) { - appendOperation(op, onDone); - startCommit(std::move(onDone)); - } - virtual CommitResult startCommit(DoneCallback onDone) = 0; + [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0; }; } // namespace proton diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 478bc594368..fffb70467a3 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -334,7 +334,7 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP } } domainWriter->append(*p, std::make_shared(inFlight)); - domainWriter->startCommit(Writer::DoneCallback()); + auto keep = domainWriter->startCommit(Writer::DoneCallback()); LOG(info, "Inflight %ld", inFlight.load()); } while (inFlight.load() != 0) { diff --git a/searchlib/src/vespa/searchlib/common/fileheadercontext.h b/searchlib/src/vespa/searchlib/common/fileheadercontext.h index 6f76fe1717d..8bb3d6a56a6 100644 --- a/searchlib/src/vespa/searchlib/common/fileheadercontext.h +++ b/searchlib/src/vespa/searchlib/common/fileheadercontext.h @@ -3,40 +3,22 @@ #include -namespace vespalib -{ - -class GenericHeader; - +namespace vespalib { + class GenericHeader; } -namespace search -{ - -namespace common -{ +namespace search::common { class FileHeaderContext { public: FileHeaderContext(); + virtual ~FileHeaderContext(); - virtual - ~FileHeaderContext(); + virtual void addTags(vespalib::GenericHeader &header, const vespalib::string &name) const = 0; - virtual void - addTags(vespalib::GenericHeader &header, - const vespalib::string &name) const = 0; - - static void - addCreateAndFreezeTime(vespalib::GenericHeader &header); - - static void - setFreezeTime(vespalib::GenericHeader &header); + static void addCreateAndFreezeTime(vespalib::GenericHeader &header); + static void setFreezeTime(vespalib::GenericHeader &header); }; - -} // namespace common - -} // namespace search - +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 43b20b57045..5d07d51cdf2 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -16,7 +16,7 @@ class SerialNumRange { public: SerialNumRange() : _from(0), _to(0) { } - SerialNumRange(SerialNum f) : _from(f), _to(f ? f-1 : f) { } + explicit SerialNumRange(SerialNum f) : _from(f), _to(f ? f-1 : f) { } SerialNumRange(SerialNum f, SerialNum t) : _from(f), _to(t) { } bool operator == (const SerialNumRange & b) const { return cmp(b) == 0; } bool operator < (const SerialNumRange & b) const { return cmp(b) < 0; } @@ -63,7 +63,7 @@ public: vespalib::ConstBufferRef _data; }; public: - Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } + explicit Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } Packet(const void * buf, size_t sz); void add(const Entry & data); void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } @@ -101,7 +101,7 @@ public: }; virtual ~Writer() = default; virtual void append(const Packet & packet, DoneCallback done) = 0; - virtual CommitResult startCommit(DoneCallback onDone) = 0; + [[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0; }; class WriterFactory { diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 7a7be3ab86b..ff2d963ccd9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -25,7 +25,6 @@ using vespalib::makeClosure; using vespalib::makeLambdaTask; using vespalib::Monitor; using vespalib::MonitorGuard; -using search::common::FileHeaderContext; using std::runtime_error; using std::make_shared; @@ -35,7 +34,7 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), _lastSerial(0), - _singleCommiter(std::make_unique(1, 128*1024)), + _singleCommitter(std::make_unique(1, 128 * 1024)), _executor(executor), _sessionId(1), _syncMonitor(), diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 86e3681f0a1..361ac8c1805 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -18,8 +18,9 @@ public: using SP = std::shared_ptr; using Executor = vespalib::SyncableThreadExecutor; using DomainPartSP = std::shared_ptr; + using FileHeaderContext = common::FileHeaderContext; Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & executor, - const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); + const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext); ~Domain() override; @@ -28,14 +29,13 @@ public: bool erase(SerialNum to); void append(const Packet & packet, Writer::DoneCallback onDone) override; - CommitResult startCommit(DoneCallback onDone) override; + [[nodiscard]] CommitResult startCommit(DoneCallback onDone) override; int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr dest); SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; void triggerSyncNow(); - bool commitIfStale(); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -73,23 +73,23 @@ private: using DomainPartList = std::map; using DurationSeconds = std::chrono::duration; - DomainConfig _config; - SerialNum _lastSerial; - std::unique_ptr _singleCommiter; - Executor & _executor; - std::atomic _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Monitor _currentChunkMonitor; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; - const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; + DomainConfig _config; + SerialNum _lastSerial; + std::unique_ptr _singleCommitter; + Executor &_executor; + std::atomic _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Monitor _currentChunkMonitor; + vespalib::Lock _sessionLock; + SessionList _sessions; + DurationSeconds _maxSessionRunTime; + vespalib::string _baseDir; + const FileHeaderContext &_fileHeaderContext; + bool _markedDeleted; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 9b7d328d9cb..7be3dd708a5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -573,7 +573,7 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) try { vespalib::Gate gate; domain->append(packet, make_shared(gate)); - domain->startCommit(make_shared()); + auto keep = domain->startCommit(make_shared()); gate.await(); ret.AddInt32(0); ret.AddString("ok"); -- cgit v1.2.3