diff options
5 files changed, 88 insertions, 254 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index 6bca156ddba..fd97cc6242a 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -8,9 +8,7 @@ #include <vespa/searchcore/proton/feedoperation/feedoperation.h> #include <atomic> -namespace proton -{ - +namespace proton { class PerDocTypeFeedMetrics; typedef std::unique_ptr<storage::spi::Result> ResultUP; @@ -38,7 +36,6 @@ private: State & operator = (const State &) = delete; State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired); ~State(); - void setNumAcksRequired(uint32_t numAcksRequired) { _unAckedCount = numAcksRequired; } void ack(); void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics); @@ -81,6 +78,12 @@ public: */ FeedToken(ITransport &transport, mbus::Reply::UP reply); + FeedToken(FeedToken &&) = default; + FeedToken & operator =(FeedToken &&) = default; + FeedToken(const FeedToken &) = default; + FeedToken & operator =(const FeedToken &) = default; + ~FeedToken() = default; + /** * Passes a receipt back to the originating FeedEngine, declaring that this * operation succeeded. If an error occured while processing the operation, @@ -88,15 +91,11 @@ public: */ void ack() const { _state->ack(); } - void - ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const - { + void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const { _state->ack(opType, metrics); } - void - incNeededAcks() const - { + void incNeededAcks() const { _state->incNeededAcks(); } @@ -148,14 +147,6 @@ public: _state->setResult(std::move(result), documentWasFound); } - /** - * This controls how many acks are required before it is acked back to the sender. - * Default is 1, and so far only adjusted by multioperation handling. - * - * @param numAcksRequired How many acks must be received before it is considered acked. - */ - void setNumAcksRequired(uint32_t numAcksRequired) const { _state->setNumAcksRequired(numAcksRequired); } - FastOS_Time getStartTime() const { return _state->getStartTime(); } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 6b48c94fabd..7cb68a8721b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -18,7 +18,6 @@ #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/common/idestructorcallback.h> -#include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> #include <unistd.h> @@ -41,47 +40,40 @@ using storage::spi::Timestamp; using storage::spi::UpdateResult; using vespalib::Executor; using vespalib::IllegalStateException; -using vespalib::makeClosure; -using vespalib::makeTask; +using vespalib::makeLambdaTask; using vespalib::make_string; using vespalib::MonitorGuard; using vespalib::LockGuard; namespace proton { - namespace { + void setUpdateWasFound(mbus::Reply &reply, bool was_found) { - assert(static_cast<DocumentReply&>(reply).getType() == - DocumentProtocol::REPLY_UPDATEDOCUMENT); + assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT); UpdateDocumentReply &update_rep = static_cast<UpdateDocumentReply&>(reply); update_rep.setWasFound(was_found); } - void setRemoveWasFound(mbus::Reply &reply, bool was_found) { - assert(static_cast<DocumentReply&>(reply).getType() == - DocumentProtocol::REPLY_REMOVEDOCUMENT); + assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT); RemoveDocumentReply &remove_rep = static_cast<RemoveDocumentReply&>(reply); remove_rep.setWasFound(was_found); } - bool ignoreOperation(const DocumentOperation &op) { - return op.getPrevTimestamp() != 0 && - op.getTimestamp() < op.getPrevTimestamp(); + return (op.getPrevTimestamp() != 0) + && (op.getTimestamp() < op.getPrevTimestamp()); } - } // namespace - void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) { TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op); } @@ -93,7 +85,6 @@ search::SerialNum FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) { for (int retryCount = 0; retryCount < 10; ++retryCount) { - SerialNum syncedTo(0); LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo); bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo); @@ -103,19 +94,12 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) continue; } if (syncedTo >= syncTo) { - LOG(spam, - "Tls sync complete, reached %" PRIu64", returning", - syncedTo); + LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo); return syncedTo; } - LOG(spam, - "Tls sync incomplete, reached %" PRIu64 ", retrying", - syncedTo); + LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } - throw vespalib::IllegalStateException( - vespalib::make_string( - "Failed to sync TLS to token %" PRIu64 ".", - syncTo)); + throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); return 0; } @@ -123,7 +107,7 @@ void FeedHandler::doHandleOperation(FeedToken token, FeedOperation::UP op) { assert(_writeService.master().isCurrentThread()); - vespalib::LockGuard guard(_feedLock); + LockGuard guard(_feedLock); _feedState->handleOperation(token, std::move(op)); } @@ -131,33 +115,23 @@ void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) { op.assertValid(); _activeFeedView->preparePut(op); if (ignoreOperation(op)) { - LOG(debug, "performPut(): ignoreOperation: docId(%s), " - "timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", - op.getDocument()->getId().toString().c_str(), - (uint64_t)op.getTimestamp(), - (uint64_t)op.getPrevTimestamp()); - if (token.get() != NULL) { + LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", + op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); + if (token) { token->setResult(ResultUP(new Result), false); token->ack(op.getType(), _metrics); } return; } storeOperation(op); - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new Result), false); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getDocument()->getId(); const document::GlobalId &gid = docId.getGlobalId(); - token->trace(1, - make_string( - "Indexing document '%s' (gid = '%s'," - " lid = '%u,%u' prevlid = '%u,%u').", - docId.toString().c_str(), - gid.toString().c_str(), - op.getSubDbId(), - op.getLid(), - op.getPrevSubDbId(), - op.getPrevLid())); + token->trace(1, make_string("Indexing document '%s' (gid = '%s',lid = '%u,%u' prevlid = '%u,%u').", + docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(), + op.getLid(), op.getPrevSubDbId(), op.getPrevLid())); } } _activeFeedView->handlePut(token.get(), op); @@ -173,15 +147,12 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op) } else if (op.getUpdate()->getCreateIfNonExistent()) { createNonExistingDocument(std::move(token), op); } else { - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getUpdate()->getId(); - token->trace(1, - make_string( - "Document '%s' not found." - " Update operation ignored", - docId.toString().c_str())); + token->trace(1, make_string("Document '%s' not found. Update operation ignored", + docId.toString().c_str())); } setUpdateWasFound(token->getReply(), false); token->ack(op.getType(), _metrics); @@ -194,22 +165,14 @@ void FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op) { storeOperation(op); - if (token.get() != NULL) { - token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), - true); + if (token) { + token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getUpdate()->getId(); const document::GlobalId &gid = docId.getGlobalId(); - token->trace(1, - make_string( - "Updating document '%s' (gid = '%s'," - " lid = '%u,%u' prevlid = '%u,%u').", - docId.toString().c_str(), - gid.toString().c_str(), - op.getSubDbId(), - op.getLid(), - op.getPrevSubDbId(), - op.getPrevLid())); + token->trace(1, make_string("Updating document '%s' (gid = '%s', lid = '%u,%u' prevlid = '%u,%u').", + docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(), + op.getLid(), op.getPrevSubDbId(), op.getPrevLid())); } setUpdateWasFound(token->getReply(), true); } @@ -226,7 +189,7 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); _activeFeedView->preparePut(putOp); storeOperation(putOp); - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); if (token->shouldTrace(1)) { const document::DocumentId &docId = putOp.getDocument()->getId(); @@ -246,7 +209,7 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio FeedToken putToken(latch, mbus::Reply::UP(new FeedReply(DocumentProtocol::REPLY_PUTDOCUMENT))); _activeFeedView->handlePut(&putToken, putOp); latch.await(); - if (token.get() != NULL) { + if (token) { token->ack(); } } @@ -255,12 +218,9 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { _activeFeedView->prepareRemove(op); if (ignoreOperation(op)) { - LOG(debug, "performRemove(): ignoreOperation: docId(%s), " - "timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", - op.getDocumentId().toString().c_str(), - (uint64_t)op.getTimestamp(), - (uint64_t)op.getPrevTimestamp()); - if (token.get() != NULL) { + LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", + op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); + if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); token->ack(op.getType(), _metrics); } @@ -270,23 +230,15 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { assert(op.getValidNewOrPrevDbdId()); assert(op.notMovingLidInSameSubDb()); storeOperation(op); - if (token.get() != NULL) { + if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); - token->setResult(ResultUP(new RemoveResult(documentWasFound)), - documentWasFound); + token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); if (token->shouldTrace(1)) { const document::DocumentId &docId = op.getDocumentId(); const document::GlobalId &gid = docId.getGlobalId(); - token->trace(1, - make_string( - "Removing document '%s' (gid = '%s'," - " lid = '%u,%u' prevlid = '%u,%u').", - docId.toString().c_str(), - gid.toString().c_str(), - op.getSubDbId(), - op.getLid(), - op.getPrevSubDbId(), - op.getPrevLid())); + token->trace(1, make_string("Removing document '%s' (gid = '%s', lid = '%u,%u' prevlid = '%u,%u').", + docId.toString().c_str(), gid.toString().c_str(), op.getSubDbId(), + op.getLid(), op.getPrevSubDbId(), op.getPrevLid())); } setRemoveWasFound(token->getReply(), documentWasFound); } @@ -294,27 +246,21 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); storeOperation(op); - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); if (token->shouldTrace(1)) { - token->trace(1, - make_string( - "Document '%s' not found." - " Remove operation stored.", - op.getDocumentId().toString().c_str())); + token->trace(1, make_string("Document '%s' not found. Remove operation stored.", + op.getDocumentId().toString().c_str())); } setRemoveWasFound(token->getReply(), false); } _activeFeedView->handleRemove(token.get(), op); } else { - if (token.get() != NULL) { + if (token) { token->setResult(ResultUP(new RemoveResult(false)), false); if (token->shouldTrace(1)) { - token->trace(1, - make_string( - "Document '%s' not found." - " Remove operation ignored", - op.getDocumentId().toString().c_str())); + token->trace(1, make_string("Document '%s' not found. Remove operation ignored", + op.getDocumentId().toString().c_str())); } setRemoveWasFound(token->getReply(), false); token->ack(op.getType(), _metrics); @@ -325,15 +271,14 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) { void FeedHandler::performGarbageCollect(FeedToken::UP token) { - if (token.get() != NULL) { + if (token) { token->ack(); } } void -FeedHandler::performCreateBucket(FeedToken::UP token, - CreateBucketOperation &op) +FeedHandler::performCreateBucket(FeedToken::UP token, CreateBucketOperation &op) { storeOperation(op); _bucketDBHandler->handleCreateBucket(op.getBucketId()); @@ -343,8 +288,7 @@ FeedHandler::performCreateBucket(FeedToken::UP token, } -void FeedHandler::performDeleteBucket(FeedToken::UP token, - DeleteBucketOperation &op) { +void FeedHandler::performDeleteBucket(FeedToken::UP token, DeleteBucketOperation &op) { _activeFeedView->prepareDeleteBucket(op); storeOperation(op); // Delete documents in bucket @@ -359,10 +303,7 @@ void FeedHandler::performDeleteBucket(FeedToken::UP token, void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) { storeOperation(op); - _bucketDBHandler->handleSplit(op.getSerialNum(), - op.getSource(), - op.getTarget1(), - op.getTarget2()); + _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); if (token) { token->ack(); } @@ -371,10 +312,7 @@ void FeedHandler::performSplit(FeedToken::UP token, SplitBucketOperation &op) { void FeedHandler::performJoin(FeedToken::UP token, JoinBucketsOperation &op) { storeOperation(op); - _bucketDBHandler->handleJoin(op.getSerialNum(), - op.getSource1(), - op.getSource2(), - op.getTarget()); + _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); if (token) { token->ack(); } @@ -393,9 +331,7 @@ FeedHandler::performEof() { assert(_writeService.master().isCurrentThread()); _writeService.sync(); - LOG(debug, - "Visiting done for transaction log domain '%s', eof received", - _tlsMgr.getDomainName().c_str()); + LOG(debug, "Visiting done for transaction log domain '%s', eof received", _tlsMgr.getDomainName().c_str()); _owner.onTransactionLogReplayDone(); _tlsMgr.replayDone(); changeToNormalFeedState(); @@ -430,7 +366,7 @@ FeedHandler::performPrune(SerialNum flushedSerial) tlsPrune(flushedSerial); // throws on error LOG(debug, "Pruned TLS to token %" PRIu64 ".", flushedSerial); _owner.onPerformPrune(flushedSerial); - } catch (const vespalib::IllegalStateException & e) { + } catch (const IllegalStateException & e) { LOG(warning, "FeedHandler::performPrune failed due to '%s'.", e.what()); } } @@ -451,7 +387,7 @@ FeedHandler::getFeedState() const { FeedState::SP state; { - vespalib::LockGuard guard(_feedLock); + LockGuard guard(_feedLock); state = _feedState; } return state; @@ -461,17 +397,15 @@ FeedHandler::getFeedState() const void FeedHandler::changeFeedState(FeedState::SP newState) { - vespalib::LockGuard guard(_feedLock); + LockGuard guard(_feedLock); changeFeedState(newState, guard); } void -FeedHandler::changeFeedState(FeedState::SP newState, const vespalib::LockGuard &) +FeedHandler::changeFeedState(FeedState::SP newState, const LockGuard &) { - LOG(debug, - "Change feed state from '%s' -> '%s'", - _feedState->getName().c_str(), newState->getName().c_str()); + LOG(debug, "Change feed state from '%s' -> '%s'", _feedState->getName().c_str(), newState->getName().c_str()); _feedState = newState; } @@ -513,14 +447,10 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _syncLock(), _syncedSerialNum(0), _allowSync(false) -{ -} +{ } -FeedHandler::~FeedHandler() -{ -} - +FeedHandler::~FeedHandler() = default; // Called on DocumentDB creatio void @@ -542,70 +472,40 @@ FeedHandler::close() _tlsMgr.close(); } - void -FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, - SerialNum flushedSummaryMgrSerial, - SerialNum oldestFlushedSerial, - SerialNum newestFlushedSerial, +FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flushedSummaryMgrSerial, + SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial, ConfigStore &config_store) { (void) newestFlushedSerial; assert(_activeFeedView); assert(_bucketDBHandler); FeedState::SP state = std::make_shared<ReplayTransactionLogState> - (getDocTypeName(), - _activeFeedView, - *_bucketDBHandler, - _replayConfig, - config_store); + (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store); changeFeedState(state); // Resurrected attribute vector might cause oldestFlushedSerial to // be lower than _prunedSerialNum, so don't warn for now. (void) oldestFlushedSerial; assert(_serialNum >= newestFlushedSerial); - TransactionLogManager::prepareReplay( - _tlsMgr.getClient(), - _docTypeName.getName(), - flushedIndexMgrSerial, - flushedSummaryMgrSerial, - config_store); + TransactionLogManager::prepareReplay(_tlsMgr.getClient(), _docTypeName.getName(), + flushedIndexMgrSerial, flushedSummaryMgrSerial, config_store); _tlsReplayProgress = _tlsMgr.startReplay(_prunedSerialNum, _serialNum, *this); } - void FeedHandler::flushDone(SerialNum flushedSerial) { // Called by flush worker thread after performing a flush task - _writeService.master().execute( - makeTask( - makeClosure( - this, - &FeedHandler::performFlushDone, - flushedSerial))); + _writeService.master().execute(makeLambdaTask([this, flushedSerial]() { performFlushDone(flushedSerial); })); + } void FeedHandler::changeToNormalFeedState() { changeFeedState(FeedState::SP(new NormalState(*this))); } -void -FeedHandler::waitForReplayDone() -{ - _tlsMgr.waitForReplayDone(); -} - -void FeedHandler::setReplayDone() { - _tlsMgr.changeReplayDone(); -} - -bool FeedHandler::getReplayDone() const { - return _tlsMgr.getReplayDone(); -} - bool FeedHandler::isDoingReplay() const { return _tlsMgr.isDoingReplay(); @@ -624,9 +524,7 @@ void FeedHandler::storeOperation(FeedOperation &op) { void FeedHandler::tlsPrune(SerialNum oldest_to_keep) { if (!_tlsWriter.erase(oldest_to_keep)) { - throw vespalib::IllegalStateException(vespalib::make_string( - "Failed to prune TLS to token %" PRIu64 ".", - oldest_to_keep)); + throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep)); } _prunedSerialNum = oldest_to_keep; } @@ -644,8 +542,8 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con DocTypeName docTypeName, const vespalib::string &rejectMessage) { if (token) { - vespalib::string message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", - opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str()); + auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", + opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str()); token->setResult(ResultUP(new ResultType(Result::RESOURCE_EXHAUSTED, message)), false); token->fail(documentapi::DocumentProtocol::ERROR_REJECTED, message); } @@ -721,9 +619,9 @@ FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op) void FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op) { - _writeService.master().execute( - makeTask(makeClosure(this, - &FeedHandler::doHandleOperation, token, std::move(op)))); + _writeService.master().execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable { + doHandleOperation(std::move(token), std::move(op)); + })); } void @@ -738,7 +636,6 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa _activeFeedView->handleMove(op, std::move(moveDoneCtx)); } - void FeedHandler::heartBeat() { @@ -746,15 +643,13 @@ FeedHandler::heartBeat() _activeFeedView->heartBeat(_serialNum); } - void FeedHandler::sync() { - _writeService.master().execute(makeTask(makeClosure(this, &FeedHandler::performSync))); + _writeService.master().execute(makeLambdaTask([this]() { performSync(); })); _writeService.sync(); } - FeedHandler::RPC::Result FeedHandler::receive(const Packet &packet) { @@ -768,12 +663,11 @@ FeedHandler::receive(const Packet &packet) return wrap->result; } - void FeedHandler::eof() { // Only called by visit, subscription gets one or more inSync() callbacks. - _writeService.master().execute(makeTask(makeClosure(this, &FeedHandler::performEof))); + _writeService.master().execute(makeLambdaTask([this]() { performEof(); })); } void @@ -781,13 +675,12 @@ FeedHandler:: performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) { const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove(); - if (lids_to_remove.get() && lids_to_remove->getNumLids() != 0) { + if (lids_to_remove && lids_to_remove->getNumLids() != 0) { storeOperation(pruneOp); _activeFeedView->handlePruneRemovedDocuments(pruneOp); } } - void FeedHandler::syncTls(SerialNum syncTo) { @@ -797,11 +690,7 @@ FeedHandler::syncTls(SerialNum syncTo) return; } if (!_allowSync) { - throw vespalib::IllegalStateException( - vespalib::make_string( - "Attempted to sync TLS to token %" PRIu64 - " at wrong time.", - syncTo)); + throw IllegalStateException(make_string("Attempted to sync TLS to token %" PRIu64 " at wrong time.", syncTo)); } SerialNum syncedTo(_tlsWriter.sync(syncTo)); { @@ -811,15 +700,4 @@ FeedHandler::syncTls(SerialNum syncTo) } } -void -FeedHandler::storeRemoteOperation(const FeedOperation &op) -{ - SerialNum serialNum(op.getSerialNum()); - assert(serialNum != 0); - if (serialNum > _serialNum) { - _tlsWriter.storeOperation(op); - _serialNum = serialNum; - } -} - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 94e70ff0fd2..335a86e0279 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -35,12 +35,7 @@ class RemoveOperation; class SplitBucketOperation; class UpdateOperation; -namespace bucketdb -{ - -class IBucketDBHandler; - -} +namespace bucketdb { class IBucketDBHandler; } /** * Class handling all aspects of feeding for a document database. @@ -122,18 +117,12 @@ private: void createNonExistingDocument(FeedTokenUP, const UpdateOperation &op); void performRemove(FeedTokenUP token, RemoveOperation &op); -private: void performGarbageCollect(FeedTokenUP token); void performCreateBucket(FeedTokenUP token, CreateBucketOperation &op); void performDeleteBucket(FeedTokenUP token, DeleteBucketOperation &op); void performSplit(FeedTokenUP token, SplitBucketOperation &op); void performJoin(FeedTokenUP token, JoinBucketsOperation &op); void performSync(); - - /** - * Used during callback from transaction log. - */ - void handleTransactionLogEntry(const Packet::Entry &entry); void performEof(); /** @@ -142,22 +131,9 @@ private: void performFlushDone(SerialNum flushedSerial); void performPrune(SerialNum flushedSerial); -public: - void considerDelayedPrune(); - -private: - /** - * Returns the current feed state of this feed handler. - */ FeedStateSP getFeedState() const; - - /** - * Used to handle feed state transitions. - */ void changeFeedState(FeedStateSP newState); - void changeFeedState(FeedStateSP newState, const vespalib::LockGuard &feedGuard); - public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; @@ -184,7 +160,7 @@ public: search::transactionlog::Writer & writer, TlsWriter * tlsWriter = nullptr); - virtual~FeedHandler(); + virtual ~FeedHandler(); /** * Init this feed handler. @@ -239,18 +215,14 @@ public: _bucketDBHandler = bucketDBHandler; } - void waitForReplayDone(); - void setSerialNum(SerialNum serialNum) { _serialNum = serialNum; } SerialNum incSerialNum() { return ++_serialNum; } SerialNum getSerialNum() const override { return _serialNum; } SerialNum getPrunedSerialNum() const { return _prunedSerialNum; } - void setReplayDone(); - bool getReplayDone() const; bool isDoingReplay() const; float getReplayProgress() const { - return _tlsReplayProgress.get() != nullptr ? _tlsReplayProgress->getProgress() : 0; + return _tlsReplayProgress ? _tlsReplayProgress->getProgress() : 0; } bool getTransactionLogReplayDone() const; vespalib::string getDocTypeName() const { return _docTypeName.getName(); } @@ -263,14 +235,13 @@ public: void heartBeat() override; virtual void sync(); - RPC::Result receive(const Packet &packet) override; void eof() override; void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); - void storeRemoteOperation(const FeedOperation &op); void storeOperation(FeedOperation &op) override; + void considerDelayedPrune(); }; } // namespace proton diff --git a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h index eefd14f7b01..40d92010c9b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h +++ b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h @@ -4,8 +4,7 @@ #include <vespa/vespalib/util/runnable.h> #include <vespa/vespalib/util/threadexecutor.h> -namespace searchcorespi { -namespace index { +namespace searchcorespi::index { /** * Interface for a single thread used for write tasks. @@ -29,7 +28,4 @@ struct IThreadService : public vespalib::ThreadExecutor }; -} // namespace index -} // namespace searchcorespi - - +} diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h index 65dd8cc1f3f..bded09143ab 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -7,8 +7,7 @@ #include <vespa/vespalib/util/syncable.h> #include <vespa/searchlib/common/isequencedtaskexecutor.h> -namespace searchcorespi { -namespace index { +namespace searchcorespi::index { /** * Interface for the thread model used for write tasks. @@ -62,5 +61,4 @@ struct IThreadingService : public vespalib::Syncable virtual search::ISequencedTaskExecutor &attributeFieldWriter() = 0; }; -} // namespace index -} // namespace searchcorespi +} |