diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-17 16:39:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-17 16:39:25 +0200 |
commit | 9685b09c1c6a7d898ef24c3a5b59ad03c6ca39ab (patch) | |
tree | 1bb00378f5f35e94f42d6a134c1db8de91b4ef97 /searchcore | |
parent | 0ef0c5f88f76e14ce8c594f39001f0dc61b81291 (diff) | |
parent | b2de6349a5e71ac46b1bd7f38ff38c96f4b1a189 (diff) |
Merge pull request #3792 from vespa-engine/balder/wire-in-a-destructor-callback
Introduce async write interface in the TLS.
Diffstat (limited to 'searchcore')
12 files changed, 88 insertions, 92 deletions
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 3b199d266a8..bce3fb7267c 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -21,9 +21,8 @@ #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/searchview.h> #include <vespa/searchcore/proton/server/summaryadapter.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/transport.h> -#include <vespa/searchlib/docstore/logdocumentstore.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> @@ -254,7 +253,7 @@ public: op.setSerialNum(serialNum); op.setDbDocumentId(dbdId); op.setPrevDbDocumentId(prevDbdId); - _ddb->getFeedHandler().storeOperation(op); + _ddb->getFeedHandler().storeOperation(op, std::make_shared<search::IgnoreCallback>()); SearchView *sv(dynamic_cast<SearchView *>(_ddb->getReadySubDB()->getSearchView().get())); if (sv != NULL) { // cf. FeedView::putAttributes() diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 823c31dd1c2..b8ffc41d3cd 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -370,7 +370,7 @@ struct MyTlsWriter : TlsWriter { bool erase_return; MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {} - void storeOperation(const FeedOperation &) override { ++store_count; } + void storeOperation(const FeedOperation &, DoneCallback) override { ++store_count; } bool erase(SerialNum) override { ++erase_count; return erase_return; } SerialNum sync(SerialNum syncTo) override { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index 55f71da9687..56bd99c90f6 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -138,7 +138,7 @@ struct MyStorer : public IOperationStorer : _moveCnt(0), _compactCnt(0) {} - virtual void storeOperation(FeedOperation &op) override { + void storeOperation(const FeedOperation &op, DoneCallback) override { if (op.getType() == FeedOperation::MOVE) { ++ _moveCnt; } else if (op.getType() == FeedOperation::COMPACT_LID_SPACE) { diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 559dbb240a8..f20ad01bcf6 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -26,7 +26,7 @@ #include <vespa/searchcore/proton/test/test.h> #include <vespa/searchlib/attribute/attributecontext.h> #include <vespa/searchlib/attribute/attributeguard.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/idocumentmetastore.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/data/slime/slime.h> @@ -232,7 +232,7 @@ public: } // Implements IOperationStorer - virtual void storeOperation(FeedOperation &op) override; + virtual void storeOperation(const FeedOperation &op, DoneCallback) override; uint32_t getHeartBeats() { return _heartBeats; @@ -781,7 +781,6 @@ MyFeedHandler::isExecutorThread() void MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) { - (void) moveDoneCtx; assert(isExecutorThread()); assert(op.getValidPrevDbdId()); _subDBs[op.getSubDbId()]->prepareMove(op); @@ -792,7 +791,7 @@ MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx assert(op.getPrevSubDbId() != 1u); assert(op.getSubDbId() < _subDBs.size()); assert(op.getPrevSubDbId() < _subDBs.size()); - storeOperation(op); + storeOperation(op, std::move(moveDoneCtx)); _subDBs[op.getSubDbId()]->handleMove(op); _subDBs[op.getPrevSubDbId()]->handleMove(op); } @@ -803,7 +802,7 @@ MyFeedHandler::performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op) { assert(isExecutorThread()); if (op.getLidsToRemove()->getNumLids() != 0u) { - storeOperation(op); + storeOperation(op, std::make_shared<search::IgnoreCallback>()); // magic number. _subDBs[1u]->handlePruneRemovedDocuments(op); } @@ -826,9 +825,9 @@ MyFeedHandler::setSubDBs(const std::vector<MyDocumentSubDB *> &subDBs) void -MyFeedHandler::storeOperation(FeedOperation &op) +MyFeedHandler::storeOperation(const FeedOperation &op, DoneCallback) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } @@ -1011,22 +1010,16 @@ MaintenanceControllerFixture::performForwardMaintenanceConfig() void -MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, - MyDocumentSubDB &subDb) +MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, MyDocumentSubDB &subDb) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - PutOperation op(testDoc.getBucket(), - testDoc.getTimestamp(), - testDoc.getDoc()); - op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + PutOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc()); + op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); subDb.handlePut(op); } } @@ -1038,18 +1031,13 @@ MaintenanceControllerFixture::removeDocs(const test::UserDocuments &docs, Timestamp timestamp) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - RemoveOperation op(testDoc.getBucket(), - timestamp, - testDoc.getDoc()->getId()); - op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + RemoveOperation op(testDoc.getBucket(), timestamp, testDoc.getDoc()->getId()); + op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); _removed.handleRemove(op); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 4198803d1fe..5babacfc4b6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -307,7 +307,7 @@ DocumentDB::enterReprocessState() if (!runner.empty()) { runner.run(); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } @@ -397,15 +397,14 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _config_store->saveConfig(*configSnapshot, serialNum); // save entry in transaction log NewConfigOperation op(serialNum, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } bool hasVisibilityDelayChanged = false; { bool elidedConfigSave = equalReplayConfig && tlsReplayDone; // Flush changes to attributes and memory index, cf. visibilityDelay - _feedView.get()->forceCommit(elidedConfigSave ? serialNum : - serialNum - 1); + _feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1); _writeService.sync(); fastos::TimeStamp visibilityDelay = configSnapshot->getMaintenanceConfigSP()->getVisibilityDelay(); hasVisibilityDelayChanged = (visibilityDelay != _visibility.getVisibilityDelay()); @@ -585,7 +584,7 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot) // pruned at once anyway. // save noop entry in transaction log NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); // Wipe everything in transaction log before initial config. try { @@ -609,7 +608,7 @@ DocumentDB::resumeSaveConfig() SerialNum confSerial = _feedHandler.incSerialNum(); // resume operation, i.e. save config entry in transaction log NewConfigOperation op(confSerial, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } @@ -776,7 +775,7 @@ DocumentDB::enterRedoReprocessState() runner.run(); _subDBs.onReprocessDone(_feedHandler.getSerialNum()); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index b01ba43cb49..5890489415e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -12,7 +12,7 @@ #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> #include <vespa/searchcorespi/index/ithreadingservice.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/exceptions.h> #include <unistd.h> @@ -46,8 +46,8 @@ ignoreOperation(const DocumentOperation &op) { } // namespace -void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) { - TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op); +void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op, DoneCallback onDone) { + TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op, std::move(onDone)); } bool FeedHandler::TlsMgrWriter::erase(SerialNum oldest_to_keep) { return _tls_mgr.getSession()->erase(oldest_to_keep); @@ -72,7 +72,6 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); - return 0; } void @@ -94,7 +93,7 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) { } return; } - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(std::make_unique<Result>(), false); } @@ -121,7 +120,7 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op) void FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op) { - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); } @@ -137,7 +136,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o op.getUpdate()->applyTo(*doc); PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); _activeFeedView->preparePut(putOp); - storeOperation(putOp); + storeOperation(putOp, token); if (token) { token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); } @@ -160,7 +159,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { if (op.getPrevDbDocumentId().valid()) { assert(op.getValidNewOrPrevDbdId()); assert(op.notMovingLidInSameSubDb()); - storeOperation(op); + storeOperation(op, token); if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); @@ -168,7 +167,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { _activeFeedView->handleRemove(std::move(token), op); } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); } @@ -186,20 +185,16 @@ FeedHandler::performGarbageCollect(FeedToken token) (void) token; } - void FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleCreateBucket(op.getBucketId()); } - void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) { - (void) token; _activeFeedView->prepareDeleteBucket(op); - storeOperation(op); + storeOperation(op, std::move(token)); // Delete documents in bucket _activeFeedView->handleDeleteBucket(op); // Delete bucket itself, should no longer have documents. @@ -207,21 +202,16 @@ void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op } - void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); } - void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); } - void FeedHandler::performSync() { @@ -412,18 +402,28 @@ FeedHandler::isDoingReplay() const { return _tlsMgr.isDoingReplay(); } -bool FeedHandler::getTransactionLogReplayDone() const { +bool +FeedHandler::getTransactionLogReplayDone() const { return _tlsMgr.getReplayDone(); } -void FeedHandler::storeOperation(FeedOperation &op) { +void +FeedHandler::storeOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } - _tlsWriter.storeOperation(op); + _tlsWriter.storeOperation(op, std::move(onDone)); } -void FeedHandler::tlsPrune(SerialNum oldest_to_keep) { +void +FeedHandler::storeOperationSync(const FeedOperation &op) { + vespalib::Gate gate; + storeOperation(op, std::make_shared<search::GateCallback>(gate)); + gate.await(); +} + +void +FeedHandler::tlsPrune(SerialNum oldest_to_keep) { if (!_tlsWriter.erase(oldest_to_keep)) { throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep)); } @@ -533,7 +533,7 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa assert(op.getValidDbdId()); assert(op.getValidPrevDbdId()); assert(op.getSubDbId() != op.getPrevSubDbId()); - storeOperation(op); + storeOperation(op, moveDoneCtx); _activeFeedView->handleMove(op, std::move(moveDoneCtx)); } @@ -577,7 +577,9 @@ performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) { const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove(); if (lids_to_remove && lids_to_remove->getNumLids() != 0) { - storeOperation(pruneOp); + vespalib::Gate gate; + storeOperation(pruneOp, std::make_shared<search::GateCallback>(gate)); + gate.await(); _activeFeedView->handlePruneRemovedDocuments(pruneOp); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index d717346883a..8c28fcdc1ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -65,7 +65,7 @@ private: _tls_mgr(tls_mgr), _tlsDirectWriter(tlsDirectWriter) { } - void storeOperation(const FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; bool erase(SerialNum oldest_to_keep) override; SerialNum sync(SerialNum syncTo) override; }; @@ -234,7 +234,8 @@ public: void eof() override; void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); - void storeOperation(FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; + void storeOperationSync(const FeedOperation & op); void considerDelayedPrune(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index 4e5958cd9e2..760250844a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -2,6 +2,8 @@ #pragma once +#include <vespa/searchlib/transactionlog/common.h> + namespace proton { class FeedOperation; @@ -11,12 +13,13 @@ class FeedOperation; */ struct IOperationStorer { - virtual ~IOperationStorer() {} + using DoneCallback = search::transactionlog::Writer::DoneCallback; + virtual ~IOperationStorer() = default; /** * Assign serial number to (if not set) and store the given operation. */ - virtual void storeOperation(FeedOperation &op) = 0; + virtual void storeOperation(const FeedOperation &op, DoneCallback onDone) = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index d6c1a032cea..2ae8d826ebc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -5,7 +5,8 @@ #include "imaintenancejobrunner.h" #include "lid_space_compaction_job.h" #include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> +#include <vespa/vespalib/util/sync.h> #include <cassert> #include <vespa/log/log.h> @@ -55,8 +56,9 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) return true; } else { MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); - _opStorer.storeOperation(*op); - _handler.handleMove(*op, _moveOpsLimiter->beginOperation()); + search::IDestructorCallback::SP context = _moveOpsLimiter->beginOperation(); + _opStorer.storeOperation(*op, context); + _handler.handleMove(*op, std::move(context)); if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } @@ -79,7 +81,9 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) { uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); - _opStorer.storeOperation(op); + vespalib::Gate gate; + _opStorer.storeOperation(op, std::make_shared<search::GateCallback>(gate)); + gate.await(); _handler.handleCompactLidSpace(op); EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); _shouldCompactLidSpace = false; diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp index 215650b6664..bfc59dee35e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp @@ -11,24 +11,24 @@ using search::transactionlog::Packet; namespace proton { -void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf) +void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone) { Packet::Entry entry(serialNum, type, vespalib::ConstBufferRef(buf.c_str(), buf.size())); Packet packet; packet.add(entry); packet.close(); - _tlsDirectWriter.commit(_domain, packet); - + _tlsDirectWriter.commit(_domain, packet, std::move(onDone)); } void -TlcProxy::storeOperation(const FeedOperation &op) +TlcProxy::storeOperation(const FeedOperation &op, DoneCallback onDone) { nbostream stream; op.serialize(stream); LOG(debug, "storeOperation(): serialNum(%" PRIu64 "), type(%u), size(%zu)", op.getSerialNum(), (uint32_t)op.getType(), stream.size()); - commit(op.getSerialNum(), (uint32_t)op.getType(), stream); + commit(op.getSerialNum(), (uint32_t)op.getType(), stream, std::move(onDone)); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h index 8e4feb2f354..2dc6501731e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h @@ -8,18 +8,20 @@ namespace proton { class FeedOperation; class TlcProxy { - vespalib::string _domain; - search::transactionlog::Writer & _tlsDirectWriter; + using DoneCallback = search::transactionlog::Writer::DoneCallback; + using Writer = search::transactionlog::Writer; + vespalib::string _domain; + Writer & _tlsDirectWriter; - void commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf); + void commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone); public: typedef std::unique_ptr<TlcProxy> UP; - TlcProxy(const vespalib::string & domain, search::transactionlog::Writer & writer) + TlcProxy(const vespalib::string & domain, Writer & writer) : _domain(domain), _tlsDirectWriter(writer) {} - void storeOperation(const FeedOperation &op); + void storeOperation(const FeedOperation &op, DoneCallback onDone); }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h index 0956c0ae011..5d51580c0ad 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h @@ -2,19 +2,17 @@ #pragma once +#include "i_operation_storer.h" #include <vespa/searchlib/common/serialnum.h> namespace proton { -class FeedOperation; - /** * Interface for writing to the TransactionLogServer. */ -struct TlsWriter { - virtual ~TlsWriter() {} +struct TlsWriter : public IOperationStorer { + virtual ~TlsWriter() = default; - virtual void storeOperation(const FeedOperation &op) = 0; virtual bool erase(search::SerialNum oldest_to_keep) = 0; virtual search::SerialNum sync(search::SerialNum syncTo) = 0; }; |