diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-12 11:08:43 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-12 11:08:43 +0200 |
commit | 4dae8f7765189e999e9c3010ab8b55eea9d3ca40 (patch) | |
tree | 5f274b06cb759d8defa309381d117f65d8b93737 /searchcore/src | |
parent | eb92ec91d4ddcaf25ffd8a08e73addd8846d7ad8 (diff) | |
parent | 2d47474eccd440c677b8b9f5ab149528d9bdc0f5 (diff) |
Merge pull request #3719 from vespa-engine/balder/remove-subscribe-interface
Remove subscribe to TLS.
Diffstat (limited to 'searchcore/src')
7 files changed, 100 insertions, 326 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index 6d57795a734..17f1faffbba 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -315,7 +315,6 @@ public: } return RPC::OK; } - virtual void inSync() override { } virtual void eof() override { _eof = true; } bool isEof() const { return _eof; } }; diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index 6bca156ddba..fd97cc6242a 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -8,9 +8,7 @@ #include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <atomic> -namespace proton -{ - +namespace proton { class PerDocTypeFeedMetrics; typedef std::unique_ptr<storage::spi::Result> ResultUP; @@ -38,7 +36,6 @@ private: State & operator = (const State &) = delete; State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired); ~State(); - void setNumAcksRequired(uint32_t numAcksRequired) { _unAckedCount = numAcksRequired; } void ack(); void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics); @@ -81,6 +78,12 @@ public: */ FeedToken(ITransport &transport, mbus::Reply::UP reply); + FeedToken(FeedToken &&) = default; + FeedToken & operator =(FeedToken &&) = default; + FeedToken(const FeedToken &) = default; + FeedToken & operator =(const FeedToken &) = default; + ~FeedToken() = default; + /** * Passes a receipt back to the originating FeedEngine, declaring that this * operation succeeded. If an error occured while processing the operation, @@ -88,15 +91,11 @@ public: */ void ack() const { _state->ack(); } - void - ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const - { + void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const { _state->ack(opType, metrics); } - void - incNeededAcks() const - { + void incNeededAcks() const { _state->incNeededAcks(); } @@ -148,14 +147,6 @@ public: _state->setResult(std::move(result), documentWasFound); } - /** - * This controls how many acks are required before it is acked back to the sender. - * Default is 1, and so far only adjusted by multioperation handling. - * - * @param numAcksRequired How many acks must be received before it is considered acked. - */ - void setNumAcksRequired(uint32_t numAcksRequired) const { _state->setNumAcksRequired(numAcksRequired); } - FastOS_Time getStartTime() const { return _state->getStartTime(); } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 0ed3de93965..7cb68a8721b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -18,7 +18,6 @@ #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/common/idestructorcallback.h> -#include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> #include <unistd.h> @@ -41,47 +40,40 @@ using storage::spi::Timestamp; using storage::spi::UpdateResult; using vespalib::Executor; using vespalib::IllegalStateException; -using vespalib::makeClosure; -using vespalib::makeTask; +using vespalib::makeLambdaTask; using vespalib::make_string; using vespalib::MonitorGuard; using vespalib::LockGuard; namespace proton { - namespace { + void setUpdateWasFound(mbus::Reply &reply, bool was_found) { - assert(static_cast<DocumentReply&>(reply).getType() == - DocumentProtocol::REPLY_UPDATEDOCUMENT); + assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT); UpdateDocumentReply &update_rep = static_cast<UpdateDocumentReply&>(reply); update_rep.setWasFound(was_found); } - void setRemoveWasFound(mbus::Reply &reply, bool was_found) { - assert(static_cast<DocumentReply&>(reply).getType() == - DocumentProtocol::REPLY_REMOVEDOCUMENT); + assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT); RemoveDocumentReply &remove_rep = static_cast<RemoveDocumentReply&>(reply); remove_rep.setWasFound(was_found); } - bool ignoreOperation(const DocumentOperation &op) { - return op.getPrevTimestamp() != 0 && - op.getTimestamp() < op.getPrevTimestamp(); + return (op.getPrevTimestamp() != 0) + && (op.getTimestamp() < op.getPrevTimestamp()); } - } // namespace - void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) { TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op); } @@ -93,7 +85,6 @@ search::SerialNum FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) { for (int retryCount = 0; retryCount < 10; ++retryCount) { - SerialNum syncedTo(0); LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo); bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo); @@ -103,19 +94,12 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) continue; } if (syncedTo >= syncTo) { - LOG(spam, - "Tls sync complete, reached %" PRIu64", returning", - syncedTo); + LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo); return syncedTo; } - LOG(spam, - "Tls sync incomplete, reached %" PRIu64 ", retrying", - syncedTo); + LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } - throw vespalib::IllegalStateException( - vespalib::make_string( - "Failed to sync TLS to token %" PRIu64 ".", - syncTo)); + throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); return 0; } @@ -123,7 +107,7 @@ void FeedHandler::doHandleOperation(FeedToken token, FeedOperation::UP op) { assert(_writeService.master().isCurrentThread()); - vespalib::LockGuard guard(_feedLock); + LockGuard guard(_feedLock); _feedState->handleOperation(token, std::move(op)); } @@ -131,33 +115,23 @@ void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) { op.assertValid(); _activeFeedView->preparePut(op); if (ignoreOperation(op)) { - LOG(debug, "performPut(): ignoreOperation: docId(%s), " - "timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", - op.getDocument()->getId().toString().c_str(), - (uint64_t)op.getTimestamp(), - (uint64_t)op.getPrevTimestamp()); - if (token.get() != NULL) { + LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", + op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); + if (token) { token->setResult(ResultUP(new Result), false); token->ack(op.getType(), _metrics); } return; } storeOperation(op); - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new Result), false); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getDocument()->getId(); const document::GlobalId &gid = docId.getGlobalId(); - token->trace(1, - make_string( - "Indexing document '%s' (gid = '%s'," - " lid = '%u,%u' prevlid = '%u,%u').", - docId.toString().c_str(), - gid.toString().c_str(), - op.getSubDbId(), - op.getLid(), - op.getPrevSubDbId(), - op.getPrevLid())); + token->trace(1, make_string("Indexing document '%s' (gid = '%s',lid = '%u,%u' prevlid = '%u,%u').", + docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(), + op.getLid(), op.getPrevSubDbId(), op.getPrevLid())); } } _activeFeedView->handlePut(token.get(), op); @@ -173,15 +147,12 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) } else if (op.getUpdate()->getCreateIfNonExistent()) { createNonExistingDocument(std::move(token), op); } else { - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getUpdate()->getId(); - token->trace(1, - make_string( - "Document '%s' not found." - " Update operation ignored", - docId.toString().c_str())); + token->trace(1, make_string("Document '%s' not found. Update operation ignored", + docId.toString().c_str())); } setUpdateWasFound(token->getReply(), false); token->ack(op.getType(), _metrics); @@ -194,22 +165,14 @@ void FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op) { storeOperation(op); - if (token.get() != NULL) { - token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), - true); + if (token) { + token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getUpdate()->getId(); const document::GlobalId &gid = docId.getGlobalId(); - token->trace(1, - make_string( - "Updating document '%s' (gid = '%s'," - " lid = '%u,%u' prevlid = '%u,%u').", - docId.toString().c_str(), - gid.toString().c_str(), - op.getSubDbId(), - op.getLid(), - op.getPrevSubDbId(), - op.getPrevLid())); + token->trace(1, make_string("Updating document '%s' (gid = '%s', lid = '%u,%u' prevlid = '%u,%u').", + docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(), + op.getLid(), op.getPrevSubDbId(), op.getPrevLid())); } setUpdateWasFound(token->getReply(), true); } @@ -226,7 +189,7 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); _activeFeedView->preparePut(putOp); storeOperation(putOp); - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); if (token->shouldTrace(1)) { const document::DocumentId &docId = putOp.getDocument()->getId(); @@ -246,7 +209,7 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio FeedToken putToken(latch, mbus::Reply::UP(new FeedReply(DocumentProtocol::REPLY_PUTDOCUMENT))); _activeFeedView->handlePut(&putToken, putOp); latch.await(); - if (token.get() != NULL) { + if (token) { token->ack(); } } @@ -255,12 +218,9 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { _activeFeedView->prepareRemove(op); if (ignoreOperation(op)) { - LOG(debug, "performRemove(): ignoreOperation: docId(%s), " - "timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", - op.getDocumentId().toString().c_str(), - (uint64_t)op.getTimestamp(), - (uint64_t)op.getPrevTimestamp()); - if (token.get() != NULL) { + LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", + op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); + if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); token->ack(op.getType(), _metrics); } @@ -270,23 +230,15 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { assert(op.getValidNewOrPrevDbdId()); assert(op.notMovingLidInSameSubDb()); storeOperation(op); - if (token.get() != NULL) { + if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); - token->setResult(ResultUP(new RemoveResult(documentWasFound)), - documentWasFound); + token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getDocumentId(); const document::GlobalId &gid = docId.getGlobalId(); - token->trace(1, - make_string( - "Removing document '%s' (gid = '%s'," - " lid = '%u,%u' prevlid = '%u,%u').", - docId.toString().c_str(), - gid.toString().c_str(), - op.getSubDbId(), - op.getLid(), - op.getPrevSubDbId(), - op.getPrevLid())); + token->trace(1, make_string("Removing document '%s' (gid = '%s', lid = '%u,%u' prevlid = '%u,%u').", + docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(), + op.getLid(), op.getPrevSubDbId(), op.getPrevLid())); } setRemoveWasFound(token->getReply(), documentWasFound); } @@ -294,27 +246,21 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); storeOperation(op); - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); if (token->shouldTrace(1)) { - token->trace(1, - make_string( - "Document '%s' not found." - " Remove operation stored.", - op.getDocumentId().toString().c_str())); + token->trace(1, make_string("Document '%s' not found. Remove operation stored.", + op.getDocumentId().toString().c_str())); } setRemoveWasFound(token->getReply(), false); } _activeFeedView->handleRemove(token.get(), op); } else { - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); if (token->shouldTrace(1)) { - token->trace(1, - make_string( - "Document '%s' not found." - " Remove operation ignored", - op.getDocumentId().toString().c_str())); + token->trace(1, make_string("Document '%s' not found. Remove operation ignored", + op.getDocumentId().toString().c_str())); } setRemoveWasFound(token->getReply(), false); token->ack(op.getType(), _metrics); @@ -325,15 +271,14 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { void FeedHandler::performGarbageCollect(FeedToken::UP token) { - if (token.get() != NULL) { + if (token) { token->ack(); } } void -FeedHandler::performCreateBucket(FeedToken::UP token, - CreateBucketOperation &op) +FeedHandler::performCreateBucket(FeedToken::UP token, CreateBucketOperation &op) { storeOperation(op); _bucketDBHandler->handleCreateBucket(op.getBucketId()); @@ -343,8 +288,7 @@ FeedHandler::performCreateBucket(FeedToken::UP token, } -void FeedHandler::performDeleteBucket(FeedToken::UP token, - DeleteBucketOperation &op) { +void FeedHandler::performDeleteBucket(FeedToken::UP token, DeleteBucketOperation &op) { _activeFeedView->prepareDeleteBucket(op); storeOperation(op); // Delete documents in bucket @@ -359,10 +303,7 @@ void FeedHandler::performDeleteBucket(FeedToken::UP token, void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) { storeOperation(op); - _bucketDBHandler->handleSplit(op.getSerialNum(), - op.getSource(), - op.getTarget1(), - op.getTarget2()); + _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); if (token) { token->ack(); } @@ -371,10 +312,7 @@ void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) { void FeedHandler::performJoin(FeedToken::UP token, JoinBucketsOperation &op) { storeOperation(op); - _bucketDBHandler->handleJoin(op.getSerialNum(), - op.getSource1(), - op.getSource2(), - op.getTarget()); + _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); if (token) { token->ack(); } @@ -393,9 +331,7 @@ FeedHandler::performEof() { assert(_writeService.master().isCurrentThread()); _writeService.sync(); - LOG(debug, - "Visiting done for transaction log domain '%s', eof received", - _tlsMgr.getDomainName().c_str()); + LOG(debug, "Visiting done for transaction log domain '%s', eof received", _tlsMgr.getDomainName().c_str()); _owner.onTransactionLogReplayDone(); _tlsMgr.replayDone(); changeToNormalFeedState(); @@ -430,7 +366,7 @@ FeedHandler::performPrune(SerialNum flushedSerial) tlsPrune(flushedSerial); // throws on error LOG(debug, "Pruned TLS to token %" PRIu64 ".", flushedSerial); _owner.onPerformPrune(flushedSerial); - } catch (const vespalib::IllegalStateException & e) { + } catch (const IllegalStateException & e) { LOG(warning, "FeedHandler::performPrune failed due to '%s'.", e.what()); } } @@ -451,7 +387,7 @@ FeedHandler::getFeedState() const { FeedState::SP state; { - vespalib::LockGuard guard(_feedLock); + LockGuard guard(_feedLock); state = _feedState; } return state; @@ -461,17 +397,15 @@ FeedHandler::getFeedState() const void FeedHandler::changeFeedState(FeedState::SP newState) { - vespalib::LockGuard guard(_feedLock); + LockGuard guard(_feedLock); changeFeedState(newState, guard); } void -FeedHandler::changeFeedState(FeedState::SP newState, const vespalib::LockGuard &) +FeedHandler::changeFeedState(FeedState::SP newState, const LockGuard &) { - LOG(debug, - "Change feed state from '%s' -> '%s'", - _feedState->getName().c_str(), newState->getName().c_str()); + LOG(debug, "Change feed state from '%s' -> '%s'", _feedState->getName().c_str(), newState->getName().c_str()); _feedState = newState; } @@ -513,14 +447,10 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _syncLock(), _syncedSerialNum(0), _allowSync(false) -{ -} - +{ } -FeedHandler::~FeedHandler() -{ -} +FeedHandler::~FeedHandler() = default; // Called on DocumentDB creatio void @@ -542,70 +472,40 @@ FeedHandler::close() _tlsMgr.close(); } - void -FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, - SerialNum flushedSummaryMgrSerial, - SerialNum oldestFlushedSerial, - SerialNum newestFlushedSerial, +FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flushedSummaryMgrSerial, + SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial, ConfigStore &config_store) { (void) newestFlushedSerial; assert(_activeFeedView); assert(_bucketDBHandler); FeedState::SP state = std::make_shared<ReplayTransactionLogState> - (getDocTypeName(), - _activeFeedView, - *_bucketDBHandler, - _replayConfig, - config_store); + (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store); changeFeedState(state); // Resurrected attribute vector might cause oldestFlushedSerial to // be lower than _prunedSerialNum, so don't warn for now. (void) oldestFlushedSerial; assert(_serialNum >= newestFlushedSerial); - TransactionLogManager::prepareReplay( - _tlsMgr.getClient(), - _docTypeName.getName(), - flushedIndexMgrSerial, - flushedSummaryMgrSerial, - config_store); + TransactionLogManager::prepareReplay(_tlsMgr.getClient(), _docTypeName.getName(), + flushedIndexMgrSerial, flushedSummaryMgrSerial, config_store); _tlsReplayProgress = _tlsMgr.startReplay(_prunedSerialNum, _serialNum, *this); } - void FeedHandler::flushDone(SerialNum flushedSerial) { // Called by flush worker thread after performing a flush task - _writeService.master().execute( - makeTask( - makeClosure( - this, - &FeedHandler::performFlushDone, - flushedSerial))); + _writeService.master().execute(makeLambdaTask([this, flushedSerial]() { performFlushDone(flushedSerial); })); + } void FeedHandler::changeToNormalFeedState() { changeFeedState(FeedState::SP(new NormalState(*this))); } -void -FeedHandler::waitForReplayDone() -{ - _tlsMgr.waitForReplayDone(); -} - -void FeedHandler::setReplayDone() { - _tlsMgr.changeReplayDone(); -} - -bool FeedHandler::getReplayDone() const { - return _tlsMgr.getReplayDone(); -} - bool FeedHandler::isDoingReplay() const { return _tlsMgr.isDoingReplay(); @@ -624,9 +524,7 @@ void FeedHandler::storeOperation(FeedOperation &op) { void FeedHandler::tlsPrune(SerialNum oldest_to_keep) { if (!_tlsWriter.erase(oldest_to_keep)) { - throw vespalib::IllegalStateException(vespalib::make_string( - "Failed to prune TLS to token %" PRIu64 ".", - oldest_to_keep)); + throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep)); } _prunedSerialNum = oldest_to_keep; } @@ -644,8 +542,8 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con DocTypeName docTypeName, const vespalib::string &rejectMessage) { if (token) { - vespalib::string message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", - opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str()); + auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", + opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str()); token->setResult(ResultUP(new ResultType(Result::RESOURCE_EXHAUSTED, message)), false); token->fail(documentapi::DocumentProtocol::ERROR_REJECTED, message); } @@ -721,9 +619,9 @@ FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op) void FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op) { - _writeService.master().execute( - makeTask(makeClosure(this, - &FeedHandler::doHandleOperation, token, std::move(op)))); + _writeService.master().execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable { + doHandleOperation(std::move(token), std::move(op)); + })); } void @@ -738,7 +636,6 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa _activeFeedView->handleMove(op, std::move(moveDoneCtx)); } - void FeedHandler::heartBeat() { @@ -746,15 +643,13 @@ FeedHandler::heartBeat() _activeFeedView->heartBeat(_serialNum); } - void FeedHandler::sync() { - _writeService.master().execute(makeTask(makeClosure(this, &FeedHandler::performSync))); + _writeService.master().execute(makeLambdaTask([this]() { performSync(); })); _writeService.sync(); } - FeedHandler::RPC::Result FeedHandler::receive(const Packet &packet) { @@ -768,34 +663,24 @@ FeedHandler::receive(const Packet &packet) return wrap->result; } - void FeedHandler::eof() { // Only called by visit, subscription gets one or more inSync() callbacks. - _writeService.master().execute(makeTask(makeClosure(this, &FeedHandler::performEof))); + _writeService.master().execute(makeLambdaTask([this]() { performEof(); })); } - -void -FeedHandler::inSync() -{ - // Called by visit callback thread, when in sync -} - - void FeedHandler:: performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) { const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove(); - if (lids_to_remove.get() && lids_to_remove->getNumLids() != 0) { + if (lids_to_remove && lids_to_remove->getNumLids() != 0) { storeOperation(pruneOp); _activeFeedView->handlePruneRemovedDocuments(pruneOp); } } - void FeedHandler::syncTls(SerialNum syncTo) { @@ -805,11 +690,7 @@ FeedHandler::syncTls(SerialNum syncTo) return; } if (!_allowSync) { - throw vespalib::IllegalStateException( - vespalib::make_string( - "Attempted to sync TLS to token %" PRIu64 - " at wrong time.", - syncTo)); + throw IllegalStateException(make_string("Attempted to sync TLS to token %" PRIu64 " at wrong time.", syncTo)); } SerialNum syncedTo(_tlsWriter.sync(syncTo)); { @@ -819,15 +700,4 @@ FeedHandler::syncTls(SerialNum syncTo) } } -void -FeedHandler::storeRemoteOperation(const FeedOperation &op) -{ - SerialNum serialNum(op.getSerialNum()); - assert(serialNum != 0); - if (serialNum > _serialNum) { - _tlsWriter.storeOperation(op); - _serialNum = serialNum; - } -} - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 32ff8f5d690..335a86e0279 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -35,12 +35,7 @@ class RemoveOperation; class SplitBucketOperation; class UpdateOperation; -namespace bucketdb -{ - -class IBucketDBHandler; - -} +namespace bucketdb { class IBucketDBHandler; } /** * Class handling all aspects of feeding for a document database. @@ -122,18 +117,12 @@ private: void createNonExistingDocument(FeedTokenUP, const UpdateOperation &op); void performRemove(FeedTokenUP token, RemoveOperation &op); -private: void performGarbageCollect(FeedTokenUP token); void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op); void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op); void performSplit(FeedTokenUP token, SplitBucketOperation &op); void performJoin(FeedTokenUP token, JoinBucketsOperation &op); void performSync(); - - /** - * Used during callback from transaction log. - */ - void handleTransactionLogEntry(const Packet::Entry &entry); void performEof(); /** @@ -142,22 +131,9 @@ private: void performFlushDone(SerialNum flushedSerial); void performPrune(SerialNum flushedSerial); -public: - void considerDelayedPrune(); - -private: - /** - * Returns the current feed state of this feed handler. - */ FeedStateSP getFeedState() const; - - /** - * Used to handle feed state transitions. - */ void changeFeedState(FeedStateSP newState); - void changeFeedState(FeedStateSP newState, const vespalib::LockGuard &feedGuard); - public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; @@ -184,22 +160,19 @@ public: search::transactionlog::Writer & writer, TlsWriter * tlsWriter = nullptr); - virtual - ~FeedHandler(); + virtual ~FeedHandler(); /** * Init this feed handler. * * @param oldestConfigSerial The serial number of the oldest config snapshot. */ - void - init(SerialNum oldestConfigSerial); + void init(SerialNum oldestConfigSerial); /** * Close this feed handler and its components. */ - void - close(); + void close(); /** * Start replay of the transaction log. @@ -223,8 +196,7 @@ public: * * @param flushedSerial serial number flushed for all relevant flush targets. */ - void - flushDone(SerialNum flushedSerial); + void flushDone(SerialNum flushedSerial); /** * Used to flip between normal and recovery feed states. @@ -235,33 +207,22 @@ public: * Update the active feed view. * Always called by the master write thread so locking is not needed. */ - void - setActiveFeedView(IFeedView *feedView) - { + void setActiveFeedView(IFeedView *feedView) { _activeFeedView = feedView; } - void - setBucketDBHandler(bucketdb::IBucketDBHandler *bucketDBHandler) - { + void setBucketDBHandler(bucketdb::IBucketDBHandler *bucketDBHandler) { _bucketDBHandler = bucketDBHandler; } - /** - * Wait until transaction log is replayed. - */ - void waitForReplayDone(); - void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; } SerialNum incSerialNum() { return ++_serialNum; } SerialNum getSerialNum() const override { return _serialNum; } SerialNum getPrunedSerialNum() const { return _prunedSerialNum; } - void setReplayDone(); - bool getReplayDone() const; bool isDoingReplay() const; float getReplayProgress() const { - return _tlsReplayProgress.get() != nullptr ? _tlsReplayProgress->getProgress() : 0; + return _tlsReplayProgress ? _tlsReplayProgress->getProgress() : 0; } bool getTransactionLogReplayDone() const; vespalib::string getDocTypeName() const { return _docTypeName.getName(); } @@ -270,46 +231,17 @@ public: void performOperation(FeedTokenUP token, FeedOperationUP op); void handleOperation(FeedToken token, FeedOperationUP op); - /** - * Implements IDocumentMoveHandler - */ - virtual void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override; - - /** - * Implements IHeartBeatHandler - */ - virtual void - heartBeat() override; - - virtual void - sync(); - - /** - * Implements TransLogClient::Session::Callback. - */ - virtual RPC::Result - receive(const Packet &packet) override; - - virtual void - eof() override; - - virtual void - inSync() override; + void handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCallback> moveDoneCtx) override; + void heartBeat() override; - /** - * Implements IPruneRemovedDocumentsHandler - */ - void - performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; + virtual void sync(); + RPC::Result receive(const Packet &packet) override; - void - syncTls(SerialNum syncTo); - - void - storeRemoteOperation(const FeedOperation &op); - - // Implements IOperationStorer - virtual void storeOperation(FeedOperation &op) override; + void eof() override; + void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; + void syncTls(SerialNum syncTo); + void storeOperation(FeedOperation &op) override; + void considerDelayedPrune(); }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp index 556ae756f13..541de44d44e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp @@ -10,7 +10,6 @@ #include <vespa/searchcore/proton/feedoperation/removeoperation.h> #include <vespa/searchcore/proton/feedoperation/splitbucketoperation.h> #include <vespa/searchcore/proton/feedoperation/updateoperation.h> -#include <vespa/persistence/spi/result.h> using storage::spi::Bucket; using storage::spi::Timestamp; diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp index 62ea321efbb..95f31f141d7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp @@ -23,10 +23,7 @@ TransactionLogManagerBase::TransactionLogManagerBase( { } -TransactionLogManagerBase::~TransactionLogManagerBase() -{ -} - +TransactionLogManagerBase::~TransactionLogManagerBase() = default; TransactionLogManagerBase::StatusResult TransactionLogManagerBase::init() @@ -65,7 +62,6 @@ TransactionLogManagerBase::init() return res; } - void TransactionLogManagerBase::internalStartReplay() { @@ -77,7 +73,6 @@ TransactionLogManagerBase::internalStartReplay() _replayStartTime = timer.MilliSecs(); } - void TransactionLogManagerBase::markReplayStarted() { @@ -85,7 +80,6 @@ TransactionLogManagerBase::markReplayStarted() _replayStarted = true; } - void TransactionLogManagerBase::changeReplayDone() { vespalib::MonitorGuard guard(_replayMonitor); @@ -93,7 +87,6 @@ void TransactionLogManagerBase::changeReplayDone() guard.broadcast(); } - void TransactionLogManagerBase::waitForReplayDone() const { @@ -103,7 +96,6 @@ TransactionLogManagerBase::waitForReplayDone() const } } - void TransactionLogManagerBase::close() { @@ -117,11 +109,6 @@ TransactionLogManagerBase::close() } } -TransLogClient::Subscriber::UP TransactionLogManagerBase::createTlcSubscriber( - TransLogClient::Session::Callback &callback) { - return _tlc.createSubscriber(_domainName, callback); -} - TransLogClient::Visitor::UP TransactionLogManagerBase::createTlcVisitor( TransLogClient::Session::Callback &callback) { return _tlc.createVisitor(_domainName, callback); diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h index ae2a8356016..1b109d8d9e1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h @@ -51,10 +51,7 @@ public: void changeReplayDone(); void close(); - TransLogClient::Subscriber::UP createTlcSubscriber( - TransLogClient::Session::Callback &callback); - TransLogClient::Visitor::UP createTlcVisitor( - TransLogClient::Session::Callback &callback); + TransLogClient::Visitor::UP createTlcVisitor(TransLogClient::Session::Callback &callback); void waitForReplayDone() const; @@ -64,8 +61,7 @@ public: bool getReplayDone() const; bool isDoingReplay() const; void logReplayComplete() const; - const vespalib::string &getRpcTarget() const - { return _tlc.getRPCTarget(); } + const vespalib::string &getRpcTarget() const { return _tlc.getRPCTarget(); } void markReplayStarted(); |