diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-12 14:18:26 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-17 15:52:10 +0200 |
commit | b2de6349a5e71ac46b1bd7f38ff38c96f4b1a189 (patch) | |
tree | 25b6e907470902fbf507f4913bead957ec424a8a /searchcore | |
parent | 4d335b9bde8f4c4b624471a5eb956af4ae6ba73e (diff) |
Introduce async write interface in the TLS.
Wire it in all the way up and in to proton.
The implementation is still synchronous.
Diffstat (limited to 'searchcore')
12 files changed, 88 insertions, 92 deletions
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 3b199d266a8..bce3fb7267c 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -21,9 +21,8 @@ #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/searchview.h> #include <vespa/searchcore/proton/server/summaryadapter.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/transport.h> -#include <vespa/searchlib/docstore/logdocumentstore.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> @@ -254,7 +253,7 @@ public: op.setSerialNum(serialNum); op.setDbDocumentId(dbdId); op.setPrevDbDocumentId(prevDbdId); - _ddb->getFeedHandler().storeOperation(op); + _ddb->getFeedHandler().storeOperation(op, std::make_shared<search::IgnoreCallback>()); SearchView *sv(dynamic_cast<SearchView *>(_ddb->getReadySubDB()->getSearchView().get())); if (sv != NULL) { // cf. FeedView::putAttributes() diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 823c31dd1c2..b8ffc41d3cd 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -370,7 +370,7 @@ struct MyTlsWriter : TlsWriter { bool erase_return; MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {} - void storeOperation(const FeedOperation &) override { ++store_count; } + void storeOperation(const FeedOperation &, DoneCallback) override { ++store_count; } bool erase(SerialNum) override { ++erase_count; return erase_return; } SerialNum sync(SerialNum syncTo) override { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index 55f71da9687..56bd99c90f6 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -138,7 +138,7 @@ struct MyStorer : public IOperationStorer : _moveCnt(0), _compactCnt(0) {} - virtual void storeOperation(FeedOperation &op) override { + void storeOperation(const FeedOperation &op, DoneCallback) override { if (op.getType() == FeedOperation::MOVE) { ++ _moveCnt; } else if (op.getType() == FeedOperation::COMPACT_LID_SPACE) { diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 559dbb240a8..f20ad01bcf6 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -26,7 +26,7 @@ #include <vespa/searchcore/proton/test/test.h> #include <vespa/searchlib/attribute/attributecontext.h> #include <vespa/searchlib/attribute/attributeguard.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/idocumentmetastore.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/data/slime/slime.h> @@ -232,7 +232,7 @@ public: } // Implements IOperationStorer - virtual void storeOperation(FeedOperation &op) override; + virtual void storeOperation(const FeedOperation &op, DoneCallback) override; uint32_t getHeartBeats() { return _heartBeats; @@ -781,7 +781,6 @@ MyFeedHandler::isExecutorThread() void MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) { - (void) moveDoneCtx; assert(isExecutorThread()); assert(op.getValidPrevDbdId()); _subDBs[op.getSubDbId()]->prepareMove(op); @@ -792,7 +791,7 @@ MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx assert(op.getPrevSubDbId() != 1u); assert(op.getSubDbId() < _subDBs.size()); assert(op.getPrevSubDbId() < _subDBs.size()); - storeOperation(op); + storeOperation(op, std::move(moveDoneCtx)); _subDBs[op.getSubDbId()]->handleMove(op); _subDBs[op.getPrevSubDbId()]->handleMove(op); } @@ -803,7 +802,7 @@ MyFeedHandler::performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op) { assert(isExecutorThread()); if (op.getLidsToRemove()->getNumLids() != 0u) { - storeOperation(op); + storeOperation(op, std::make_shared<search::IgnoreCallback>()); // magic number. _subDBs[1u]->handlePruneRemovedDocuments(op); } @@ -826,9 +825,9 @@ MyFeedHandler::setSubDBs(const std::vector<MyDocumentSubDB *> &subDBs) void -MyFeedHandler::storeOperation(FeedOperation &op) +MyFeedHandler::storeOperation(const FeedOperation &op, DoneCallback) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } @@ -1011,22 +1010,16 @@ MaintenanceControllerFixture::performForwardMaintenanceConfig() void -MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, - MyDocumentSubDB &subDb) +MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, MyDocumentSubDB &subDb) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - PutOperation op(testDoc.getBucket(), - testDoc.getTimestamp(), - testDoc.getDoc()); - op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + PutOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc()); + op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); subDb.handlePut(op); } } @@ -1038,18 +1031,13 @@ MaintenanceControllerFixture::removeDocs(const test::UserDocuments &docs, Timestamp timestamp) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - RemoveOperation op(testDoc.getBucket(), - timestamp, - testDoc.getDoc()->getId()); - op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + RemoveOperation op(testDoc.getBucket(), timestamp, testDoc.getDoc()->getId()); + op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); _removed.handleRemove(op); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 4198803d1fe..5babacfc4b6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -307,7 +307,7 @@ DocumentDB::enterReprocessState() if (!runner.empty()) { runner.run(); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } @@ -397,15 +397,14 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _config_store->saveConfig(*configSnapshot, serialNum); // save entry in transaction log NewConfigOperation op(serialNum, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } bool hasVisibilityDelayChanged = false; { bool elidedConfigSave = equalReplayConfig && tlsReplayDone; // Flush changes to attributes and memory index, cf. visibilityDelay - _feedView.get()->forceCommit(elidedConfigSave ? serialNum : - serialNum - 1); + _feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1); _writeService.sync(); fastos::TimeStamp visibilityDelay = configSnapshot->getMaintenanceConfigSP()->getVisibilityDelay(); hasVisibilityDelayChanged = (visibilityDelay != _visibility.getVisibilityDelay()); @@ -585,7 +584,7 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot) // pruned at once anyway. // save noop entry in transaction log NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); // Wipe everything in transaction log before initial config. try { @@ -609,7 +608,7 @@ DocumentDB::resumeSaveConfig() SerialNum confSerial = _feedHandler.incSerialNum(); // resume operation, i.e. save config entry in transaction log NewConfigOperation op(confSerial, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } @@ -776,7 +775,7 @@ DocumentDB::enterRedoReprocessState() runner.run(); _subDBs.onReprocessDone(_feedHandler.getSerialNum()); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index b01ba43cb49..5890489415e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -12,7 +12,7 @@ #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> #include <vespa/searchcorespi/index/ithreadingservice.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/exceptions.h> #include <unistd.h> @@ -46,8 +46,8 @@ ignoreOperation(const DocumentOperation &op) { } // namespace -void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) { - TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op); +void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op, DoneCallback onDone) { + TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op, std::move(onDone)); } bool FeedHandler::TlsMgrWriter::erase(SerialNum oldest_to_keep) { return _tls_mgr.getSession()->erase(oldest_to_keep); @@ -72,7 +72,6 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); - return 0; } void @@ -94,7 +93,7 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) { } return; } - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(std::make_unique<Result>(), false); } @@ -121,7 +120,7 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op) void FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op) { - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); } @@ -137,7 +136,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o op.getUpdate()->applyTo(*doc); PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); _activeFeedView->preparePut(putOp); - storeOperation(putOp); + storeOperation(putOp, token); if (token) { token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); } @@ -160,7 +159,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { if (op.getPrevDbDocumentId().valid()) { assert(op.getValidNewOrPrevDbdId()); assert(op.notMovingLidInSameSubDb()); - storeOperation(op); + storeOperation(op, token); if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); @@ -168,7 +167,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { _activeFeedView->handleRemove(std::move(token), op); } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); } @@ -186,20 +185,16 @@ FeedHandler::performGarbageCollect(FeedToken token) (void) token; } - void FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleCreateBucket(op.getBucketId()); } - void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) { - (void) token; _activeFeedView->prepareDeleteBucket(op); - storeOperation(op); + storeOperation(op, std::move(token)); // Delete documents in bucket _activeFeedView->handleDeleteBucket(op); // Delete bucket itself, should no longer have documents. @@ -207,21 +202,16 @@ void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op } - void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); } - void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); } - void FeedHandler::performSync() { @@ -412,18 +402,28 @@ FeedHandler::isDoingReplay() const { return _tlsMgr.isDoingReplay(); } -bool FeedHandler::getTransactionLogReplayDone() const { +bool +FeedHandler::getTransactionLogReplayDone() const { return _tlsMgr.getReplayDone(); } -void FeedHandler::storeOperation(FeedOperation &op) { +void +FeedHandler::storeOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } - _tlsWriter.storeOperation(op); + _tlsWriter.storeOperation(op, std::move(onDone)); } -void FeedHandler::tlsPrune(SerialNum oldest_to_keep) { +void +FeedHandler::storeOperationSync(const FeedOperation &op) { + vespalib::Gate gate; + storeOperation(op, std::make_shared<search::GateCallback>(gate)); + gate.await(); +} + +void +FeedHandler::tlsPrune(SerialNum oldest_to_keep) { if (!_tlsWriter.erase(oldest_to_keep)) { throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep)); } @@ -533,7 +533,7 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa assert(op.getValidDbdId()); assert(op.getValidPrevDbdId()); assert(op.getSubDbId() != op.getPrevSubDbId()); - storeOperation(op); + storeOperation(op, moveDoneCtx); _activeFeedView->handleMove(op, std::move(moveDoneCtx)); } @@ -577,7 +577,9 @@ performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) { const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove(); if (lids_to_remove && lids_to_remove->getNumLids() != 0) { - storeOperation(pruneOp); + vespalib::Gate gate; + storeOperation(pruneOp, std::make_shared<search::GateCallback>(gate)); + gate.await(); _activeFeedView->handlePruneRemovedDocuments(pruneOp); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index d717346883a..8c28fcdc1ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -65,7 +65,7 @@ private: _tls_mgr(tls_mgr), _tlsDirectWriter(tlsDirectWriter) { } - void storeOperation(const FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; bool erase(SerialNum oldest_to_keep) override; SerialNum sync(SerialNum syncTo) override; }; @@ -234,7 +234,8 @@ public: void eof() override; void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); - void storeOperation(FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; + void storeOperationSync(const FeedOperation & op); void considerDelayedPrune(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index 4e5958cd9e2..760250844a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -2,6 +2,8 @@ #pragma once +#include <vespa/searchlib/transactionlog/common.h> + namespace proton { class FeedOperation; @@ -11,12 +13,13 @@ class FeedOperation; */ struct IOperationStorer { - virtual ~IOperationStorer() {} + using DoneCallback = search::transactionlog::Writer::DoneCallback; + virtual ~IOperationStorer() = default; /** * Assign serial number to (if not set) and store the given operation. */ - virtual void storeOperation(FeedOperation &op) = 0; + virtual void storeOperation(const FeedOperation &op, DoneCallback onDone) = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index d6c1a032cea..2ae8d826ebc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -5,7 +5,8 @@ #include "imaintenancejobrunner.h" #include "lid_space_compaction_job.h" #include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> +#include <vespa/vespalib/util/sync.h> #include <cassert> #include <vespa/log/log.h> @@ -55,8 +56,9 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) return true; } else { MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); - _opStorer.storeOperation(*op); - _handler.handleMove(*op, _moveOpsLimiter->beginOperation()); + search::IDestructorCallback::SP context = _moveOpsLimiter->beginOperation(); + _opStorer.storeOperation(*op, context); + _handler.handleMove(*op, std::move(context)); if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } @@ -79,7 +81,9 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) { uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); - _opStorer.storeOperation(op); + vespalib::Gate gate; + _opStorer.storeOperation(op, std::make_shared<search::GateCallback>(gate)); + gate.await(); _handler.handleCompactLidSpace(op); EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); _shouldCompactLidSpace = false; diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp index 215650b6664..bfc59dee35e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp @@ -11,24 +11,24 @@ using search::transactionlog::Packet; namespace proton { -void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf) +void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone) { Packet::Entry entry(serialNum, type, vespalib::ConstBufferRef(buf.c_str(), buf.size())); Packet packet; packet.add(entry); packet.close(); - _tlsDirectWriter.commit(_domain, packet); - + _tlsDirectWriter.commit(_domain, packet, std::move(onDone)); } void -TlcProxy::storeOperation(const FeedOperation &op) +TlcProxy::storeOperation(const FeedOperation &op, DoneCallback onDone) { nbostream stream; op.serialize(stream); LOG(debug, "storeOperation(): serialNum(%" PRIu64 "), type(%u), size(%zu)", op.getSerialNum(), (uint32_t)op.getType(), stream.size()); - commit(op.getSerialNum(), (uint32_t)op.getType(), stream); + commit(op.getSerialNum(), (uint32_t)op.getType(), stream, std::move(onDone)); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h index 8e4feb2f354..2dc6501731e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h @@ -8,18 +8,20 @@ namespace proton { class FeedOperation; class TlcProxy { - vespalib::string _domain; - search::transactionlog::Writer & _tlsDirectWriter; + using DoneCallback = search::transactionlog::Writer::DoneCallback; + using Writer = search::transactionlog::Writer; + vespalib::string _domain; + Writer & _tlsDirectWriter; - void commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf); + void commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone); public: typedef std::unique_ptr<TlcProxy> UP; - TlcProxy(const vespalib::string & domain, search::transactionlog::Writer & writer) + TlcProxy(const vespalib::string & domain, Writer & writer) : _domain(domain), _tlsDirectWriter(writer) {} - void storeOperation(const FeedOperation &op); + void storeOperation(const FeedOperation &op, DoneCallback onDone); }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h index 0956c0ae011..5d51580c0ad 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h @@ -2,19 +2,17 @@ #pragma once +#include "i_operation_storer.h" #include <vespa/searchlib/common/serialnum.h> namespace proton { -class FeedOperation; - /** * Interface for writing to the TransactionLogServer. */ -struct TlsWriter { - virtual ~TlsWriter() {} +struct TlsWriter : public IOperationStorer { + virtual ~TlsWriter() = default; - virtual void storeOperation(const FeedOperation &op) = 0; virtual bool erase(search::SerialNum oldest_to_keep) = 0; virtual search::SerialNum sync(search::SerialNum syncTo) = 0; }; |