diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-12 14:18:26 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-17 15:52:10 +0200 |
commit | b2de6349a5e71ac46b1bd7f38ff38c96f4b1a189 (patch) | |
tree | 25b6e907470902fbf507f4913bead957ec424a8a | |
parent | 4d335b9bde8f4c4b624471a5eb956af4ae6ba73e (diff) |
Introduce async write interface in the TLS.
Wire it in all the way up and in to proton.
The implementation is still synchronous.
20 files changed, 134 insertions, 99 deletions
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 3b199d266a8..bce3fb7267c 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -21,9 +21,8 @@ #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/searchview.h> #include <vespa/searchcore/proton/server/summaryadapter.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/transport.h> -#include <vespa/searchlib/docstore/logdocumentstore.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> @@ -254,7 +253,7 @@ public: op.setSerialNum(serialNum); op.setDbDocumentId(dbdId); op.setPrevDbDocumentId(prevDbdId); - _ddb->getFeedHandler().storeOperation(op); + _ddb->getFeedHandler().storeOperation(op, std::make_shared<search::IgnoreCallback>()); SearchView *sv(dynamic_cast<SearchView *>(_ddb->getReadySubDB()->getSearchView().get())); if (sv != NULL) { // cf. FeedView::putAttributes() diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 823c31dd1c2..b8ffc41d3cd 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -370,7 +370,7 @@ struct MyTlsWriter : TlsWriter { bool erase_return; MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {} - void storeOperation(const FeedOperation &) override { ++store_count; } + void storeOperation(const FeedOperation &, DoneCallback) override { ++store_count; } bool erase(SerialNum) override { ++erase_count; return erase_return; } SerialNum sync(SerialNum syncTo) override { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index 55f71da9687..56bd99c90f6 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -138,7 +138,7 @@ struct MyStorer : public IOperationStorer : _moveCnt(0), _compactCnt(0) {} - virtual void storeOperation(FeedOperation &op) override { + void storeOperation(const FeedOperation &op, DoneCallback) override { if (op.getType() == FeedOperation::MOVE) { ++ _moveCnt; } else if (op.getType() == FeedOperation::COMPACT_LID_SPACE) { diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 559dbb240a8..f20ad01bcf6 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -26,7 +26,7 @@ #include <vespa/searchcore/proton/test/test.h> #include <vespa/searchlib/attribute/attributecontext.h> #include <vespa/searchlib/attribute/attributeguard.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/idocumentmetastore.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/data/slime/slime.h> @@ -232,7 +232,7 @@ public: } // Implements IOperationStorer - virtual void storeOperation(FeedOperation &op) override; + virtual void storeOperation(const FeedOperation &op, DoneCallback) override; uint32_t getHeartBeats() { return _heartBeats; @@ -781,7 +781,6 @@ MyFeedHandler::isExecutorThread() void MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) { - (void) moveDoneCtx; assert(isExecutorThread()); assert(op.getValidPrevDbdId()); _subDBs[op.getSubDbId()]->prepareMove(op); @@ -792,7 +791,7 @@ MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx assert(op.getPrevSubDbId() != 1u); assert(op.getSubDbId() < _subDBs.size()); assert(op.getPrevSubDbId() < _subDBs.size()); - storeOperation(op); + storeOperation(op, std::move(moveDoneCtx)); _subDBs[op.getSubDbId()]->handleMove(op); _subDBs[op.getPrevSubDbId()]->handleMove(op); } @@ -803,7 +802,7 @@ MyFeedHandler::performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op) { assert(isExecutorThread()); if (op.getLidsToRemove()->getNumLids() != 0u) { - storeOperation(op); + storeOperation(op, std::make_shared<search::IgnoreCallback>()); // magic number. _subDBs[1u]->handlePruneRemovedDocuments(op); } @@ -826,9 +825,9 @@ MyFeedHandler::setSubDBs(const std::vector<MyDocumentSubDB *> &subDBs) void -MyFeedHandler::storeOperation(FeedOperation &op) +MyFeedHandler::storeOperation(const FeedOperation &op, DoneCallback) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } @@ -1011,22 +1010,16 @@ MaintenanceControllerFixture::performForwardMaintenanceConfig() void -MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, - MyDocumentSubDB &subDb) +MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, MyDocumentSubDB &subDb) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - PutOperation op(testDoc.getBucket(), - testDoc.getTimestamp(), - testDoc.getDoc()); - op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + PutOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc()); + op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); subDb.handlePut(op); } } @@ -1038,18 +1031,13 @@ MaintenanceControllerFixture::removeDocs(const test::UserDocuments &docs, Timestamp timestamp) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - RemoveOperation op(testDoc.getBucket(), - timestamp, - testDoc.getDoc()->getId()); - op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + RemoveOperation op(testDoc.getBucket(), timestamp, testDoc.getDoc()->getId()); + op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); _removed.handleRemove(op); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 4198803d1fe..5babacfc4b6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -307,7 +307,7 @@ DocumentDB::enterReprocessState() if (!runner.empty()) { runner.run(); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } @@ -397,15 +397,14 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _config_store->saveConfig(*configSnapshot, serialNum); // save entry in transaction log NewConfigOperation op(serialNum, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } bool hasVisibilityDelayChanged = false; { bool elidedConfigSave = equalReplayConfig && tlsReplayDone; // Flush changes to attributes and memory index, cf. visibilityDelay - _feedView.get()->forceCommit(elidedConfigSave ? serialNum : - serialNum - 1); + _feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1); _writeService.sync(); fastos::TimeStamp visibilityDelay = configSnapshot->getMaintenanceConfigSP()->getVisibilityDelay(); hasVisibilityDelayChanged = (visibilityDelay != _visibility.getVisibilityDelay()); @@ -585,7 +584,7 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot) // pruned at once anyway. // save noop entry in transaction log NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); // Wipe everything in transaction log before initial config. try { @@ -609,7 +608,7 @@ DocumentDB::resumeSaveConfig() SerialNum confSerial = _feedHandler.incSerialNum(); // resume operation, i.e. save config entry in transaction log NewConfigOperation op(confSerial, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } @@ -776,7 +775,7 @@ DocumentDB::enterRedoReprocessState() runner.run(); _subDBs.onReprocessDone(_feedHandler.getSerialNum()); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index b01ba43cb49..5890489415e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -12,7 +12,7 @@ #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> #include <vespa/searchcorespi/index/ithreadingservice.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/exceptions.h> #include <unistd.h> @@ -46,8 +46,8 @@ ignoreOperation(const DocumentOperation &op) { } // namespace -void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) { - TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op); +void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op, DoneCallback onDone) { + TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op, std::move(onDone)); } bool FeedHandler::TlsMgrWriter::erase(SerialNum oldest_to_keep) { return _tls_mgr.getSession()->erase(oldest_to_keep); @@ -72,7 +72,6 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); - return 0; } void @@ -94,7 +93,7 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) { } return; } - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(std::make_unique<Result>(), false); } @@ -121,7 +120,7 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op) void FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op) { - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); } @@ -137,7 +136,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o op.getUpdate()->applyTo(*doc); PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); _activeFeedView->preparePut(putOp); - storeOperation(putOp); + storeOperation(putOp, token); if (token) { token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); } @@ -160,7 +159,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { if (op.getPrevDbDocumentId().valid()) { assert(op.getValidNewOrPrevDbdId()); assert(op.notMovingLidInSameSubDb()); - storeOperation(op); + storeOperation(op, token); if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); @@ -168,7 +167,7 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { _activeFeedView->handleRemove(std::move(token), op); } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); - storeOperation(op); + storeOperation(op, token); if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); } @@ -186,20 +185,16 @@ FeedHandler::performGarbageCollect(FeedToken token) (void) token; } - void FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleCreateBucket(op.getBucketId()); } - void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) { - (void) token; _activeFeedView->prepareDeleteBucket(op); - storeOperation(op); + storeOperation(op, std::move(token)); // Delete documents in bucket _activeFeedView->handleDeleteBucket(op); // Delete bucket itself, should no longer have documents. @@ -207,21 +202,16 @@ void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op } - void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); } - void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, std::move(token)); _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); } - void FeedHandler::performSync() { @@ -412,18 +402,28 @@ FeedHandler::isDoingReplay() const { return _tlsMgr.isDoingReplay(); } -bool FeedHandler::getTransactionLogReplayDone() const { +bool +FeedHandler::getTransactionLogReplayDone() const { return _tlsMgr.getReplayDone(); } -void FeedHandler::storeOperation(FeedOperation &op) { +void +FeedHandler::storeOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } - _tlsWriter.storeOperation(op); + _tlsWriter.storeOperation(op, std::move(onDone)); } -void FeedHandler::tlsPrune(SerialNum oldest_to_keep) { +void +FeedHandler::storeOperationSync(const FeedOperation &op) { + vespalib::Gate gate; + storeOperation(op, std::make_shared<search::GateCallback>(gate)); + gate.await(); +} + +void +FeedHandler::tlsPrune(SerialNum oldest_to_keep) { if (!_tlsWriter.erase(oldest_to_keep)) { throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep)); } @@ -533,7 +533,7 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa assert(op.getValidDbdId()); assert(op.getValidPrevDbdId()); assert(op.getSubDbId() != op.getPrevSubDbId()); - storeOperation(op); + storeOperation(op, moveDoneCtx); _activeFeedView->handleMove(op, std::move(moveDoneCtx)); } @@ -577,7 +577,9 @@ performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) { const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove(); if (lids_to_remove && lids_to_remove->getNumLids() != 0) { - storeOperation(pruneOp); + vespalib::Gate gate; + storeOperation(pruneOp, std::make_shared<search::GateCallback>(gate)); + gate.await(); _activeFeedView->handlePruneRemovedDocuments(pruneOp); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index d717346883a..8c28fcdc1ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -65,7 +65,7 @@ private: _tls_mgr(tls_mgr), _tlsDirectWriter(tlsDirectWriter) { } - void storeOperation(const FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; bool erase(SerialNum oldest_to_keep) override; SerialNum sync(SerialNum syncTo) override; }; @@ -234,7 +234,8 @@ public: void eof() override; void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); - void storeOperation(FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; + void storeOperationSync(const FeedOperation & op); void considerDelayedPrune(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index 4e5958cd9e2..760250844a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -2,6 +2,8 @@ #pragma once +#include <vespa/searchlib/transactionlog/common.h> + namespace proton { class FeedOperation; @@ -11,12 +13,13 @@ class FeedOperation; */ struct IOperationStorer { - virtual ~IOperationStorer() {} + using DoneCallback = search::transactionlog::Writer::DoneCallback; + virtual ~IOperationStorer() = default; /** * Assign serial number to (if not set) and store the given operation. */ - virtual void storeOperation(FeedOperation &op) = 0; + virtual void storeOperation(const FeedOperation &op, DoneCallback onDone) = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index d6c1a032cea..2ae8d826ebc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -5,7 +5,8 @@ #include "imaintenancejobrunner.h" #include "lid_space_compaction_job.h" #include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> +#include <vespa/vespalib/util/sync.h> #include <cassert> #include <vespa/log/log.h> @@ -55,8 +56,9 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) return true; } else { MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); - _opStorer.storeOperation(*op); - _handler.handleMove(*op, _moveOpsLimiter->beginOperation()); + search::IDestructorCallback::SP context = _moveOpsLimiter->beginOperation(); + _opStorer.storeOperation(*op, context); + _handler.handleMove(*op, std::move(context)); if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } @@ -79,7 +81,9 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) { uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); - _opStorer.storeOperation(op); + vespalib::Gate gate; + _opStorer.storeOperation(op, std::make_shared<search::GateCallback>(gate)); + gate.await(); _handler.handleCompactLidSpace(op); EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); _shouldCompactLidSpace = false; diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp index 215650b6664..bfc59dee35e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp @@ -11,24 +11,24 @@ using search::transactionlog::Packet; namespace proton { -void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf) +void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone) { Packet::Entry entry(serialNum, type, vespalib::ConstBufferRef(buf.c_str(), buf.size())); Packet packet; packet.add(entry); packet.close(); - _tlsDirectWriter.commit(_domain, packet); - + _tlsDirectWriter.commit(_domain, packet, std::move(onDone)); } void -TlcProxy::storeOperation(const FeedOperation &op) +TlcProxy::storeOperation(const FeedOperation &op, DoneCallback onDone) { nbostream stream; op.serialize(stream); LOG(debug, "storeOperation(): serialNum(%" PRIu64 "), type(%u), size(%zu)", op.getSerialNum(), (uint32_t)op.getType(), stream.size()); - commit(op.getSerialNum(), (uint32_t)op.getType(), stream); + commit(op.getSerialNum(), (uint32_t)op.getType(), stream, std::move(onDone)); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h index 8e4feb2f354..2dc6501731e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h @@ -8,18 +8,20 @@ namespace proton { class FeedOperation; class TlcProxy { - vespalib::string _domain; - search::transactionlog::Writer & _tlsDirectWriter; + using DoneCallback = search::transactionlog::Writer::DoneCallback; + using Writer = search::transactionlog::Writer; + vespalib::string _domain; + Writer & _tlsDirectWriter; - void commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf); + void commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone); public: typedef std::unique_ptr<TlcProxy> UP; - TlcProxy(const vespalib::string & domain, search::transactionlog::Writer & writer) + TlcProxy(const vespalib::string & domain, Writer & writer) : _domain(domain), _tlsDirectWriter(writer) {} - void storeOperation(const FeedOperation &op); + void storeOperation(const FeedOperation &op, DoneCallback onDone); }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h index 0956c0ae011..5d51580c0ad 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h @@ -2,19 +2,17 @@ #pragma once +#include "i_operation_storer.h" #include <vespa/searchlib/common/serialnum.h> namespace proton { -class FeedOperation; - /** * Interface for writing to the TransactionLogServer. */ -struct TlsWriter { - virtual ~TlsWriter() {} +struct TlsWriter : public IOperationStorer { + virtual ~TlsWriter() = default; - virtual void storeOperation(const FeedOperation &op) = 0; virtual bool erase(search::SerialNum oldest_to_keep) = 0; virtual search::SerialNum sync(search::SerialNum syncTo) = 0; }; diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt index b1f71303449..f9db738528c 100644 --- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt @@ -13,6 +13,7 @@ vespa_add_library(searchlib_common OBJECT featureset.cpp fileheadercontext.cpp foregroundtaskexecutor.cpp + gatecallback.cpp growablebitvector.cpp indexmetainfo.cpp location.cpp diff --git a/searchlib/src/vespa/searchlib/common/gatecallback.cpp b/searchlib/src/vespa/searchlib/common/gatecallback.cpp new file mode 100644 index 00000000000..a853909be71 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/gatecallback.cpp @@ -0,0 +1,12 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "gatecallback.h" +#include <vespa/vespalib/util/sync.h> + +namespace search { + +GateCallback::~GateCallback() { + _gate.countDown(); +} + +} diff --git a/searchlib/src/vespa/searchlib/common/gatecallback.h b/searchlib/src/vespa/searchlib/common/gatecallback.h new file mode 100644 index 00000000000..1e85d796089 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/gatecallback.h @@ -0,0 +1,24 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "idestructorcallback.h" + +namespace vespalib { class Gate; } + +namespace search { + +class GateCallback : public IDestructorCallback { +public: + GateCallback(vespalib::Gate & gate) : _gate(gate) {} + ~GateCallback() override; +private: + vespalib::Gate & _gate; +}; + +class IgnoreCallback : public IDestructorCallback { +public: + IgnoreCallback() { } + ~IgnoreCallback() override = default; +}; + +} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/idestructorcallback.h b/searchlib/src/vespa/searchlib/common/idestructorcallback.h index 4c42f68f0e4..77adba7a4cc 100644 --- a/searchlib/src/vespa/searchlib/common/idestructorcallback.h +++ b/searchlib/src/vespa/searchlib/common/idestructorcallback.h @@ -3,8 +3,7 @@ #include <memory> -namespace search -{ +namespace search { /** * Interface for class that performs a callback when instance is @@ -17,7 +16,7 @@ class IDestructorCallback { public: using SP = std::shared_ptr<IDestructorCallback>; - virtual ~IDestructorCallback() { } + virtual ~IDestructorCallback() = default; }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 65ef8f363c0..db8b9727daa 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchlib/common/idestructorcallback.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/buffer.h> @@ -90,8 +91,9 @@ int makeDirectory(const char * dir); class Writer { public: + using DoneCallback = std::shared_ptr<IDestructorCallback>; virtual ~Writer() { } - virtual void commit(const vespalib::string & domainName, const Packet & packet) = 0; + virtual void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) = 0; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ca17457bdb9..e793aafd38f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -453,8 +453,9 @@ void TransLogServer::domainStatus(FRT_RPCRequest *req) } } -void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet) +void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) { + (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { domain->commit(packet); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 92832786059..c12e37dd1c8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -31,7 +31,7 @@ public: virtual ~TransLogServer(); DomainStats getDomainStats() const; - void commit(const vespalib::string & domainName, const Packet & packet) override; + void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; class Session diff --git a/vespalib/src/vespa/vespalib/util/sync.h b/vespalib/src/vespa/vespalib/util/sync.h index f961c280174..86e0a227c72 100644 --- a/vespalib/src/vespa/vespalib/util/sync.h +++ b/vespalib/src/vespa/vespalib/util/sync.h @@ -710,7 +710,7 @@ public: /** * Empty. Needs to be virtual to reduce compiler warnings. **/ - virtual ~CountDownLatch() {} + virtual ~CountDownLatch() = default; }; |