diff options
7 files changed, 104 insertions, 46 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp index a48c6373c00..5c304a1c7fc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp @@ -44,7 +44,7 @@ FastAccessFeedView::putAttributes(SerialNum serialNum, { _attributeWriter->put(serialNum, doc, lid, immediateCommit, onWriteDone); if (immediateCommit && onWriteDone) { - onWriteDone->registerPutLid(lid, &_docIdLimit); + onWriteDone->registerPutLid(&_docIdLimit); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index ad50e3a92aa..8462e3d0a77 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -3,15 +3,25 @@ #include "putdonecontext.h" #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/common/docid_limit.h> +#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics) + PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum, + bool changedDbdId) : OperationDoneContext(std::move(token), opType, metrics), - _lid(0), - _docIdLimit(nullptr) + _lid(lid), + _docIdLimit(nullptr), + _gidToLidChangeHandler(gidToLidChangeHandler), + _gid(gid), + _serialNum(serialNum), + _changedDbdId(changedDbdId) { } @@ -20,6 +30,9 @@ PutDoneContext::~PutDoneContext() if (_docIdLimit != nullptr) { _docIdLimit->bumpUpLimit(_lid + 1); } + if (_changedDbdId) { + _gidToLidChangeHandler.notifyPut(_gid, _lid, _serialNum); + } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index ba94891fd6e..5a5ff849ac7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -3,12 +3,15 @@ #pragma once #include "operationdonecontext.h" +#include <vespa/document/base/globalid.h> +#include <vespa/searchlib/common/serialnum.h> namespace proton { class DocIdLimit; +class IGidToLidChangeHandler; /** * Context class for document put operations that acks operation when @@ -21,17 +24,25 @@ class PutDoneContext : public OperationDoneContext { uint32_t _lid; DocIdLimit *_docIdLimit; + IGidToLidChangeHandler &_gidToLidChangeHandler; + document::GlobalId _gid; + search::SerialNum _serialNum; + bool _changedDbdId; // lid or document subdb changed public: PutDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics); + PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum, + bool changedDbdId); virtual ~PutDoneContext(); - void registerPutLid(uint32_t lid, DocIdLimit *docIdLimit) + void registerPutLid(DocIdLimit *docIdLimit) { - _lid = lid; _docIdLimit = docIdLimit; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index 219d7482853..301224cf134 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -3,6 +3,7 @@ #include "removedonecontext.h" #include "removedonetask.h" #include <vespa/searchcore/proton/common/feedtoken.h> +#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { @@ -11,10 +12,16 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - uint32_t lid) + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum) : OperationDoneContext(std::move(token), opType, metrics), _executor(executor), - _task() + _task(), + _gidToLidChangeHandler(gidToLidChangeHandler), + _gid(gid), + _serialNum(serialNum) { if (lid != 0) { _task = std::make_unique<RemoveDoneTask>(documentMetaStore, lid); @@ -23,6 +30,7 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, RemoveDoneContext::~RemoveDoneContext() { + _gidToLidChangeHandler.notifyRemoveDone(_gid, _serialNum); ack(); if (_task) { vespalib::Executor::Task::UP res = _executor.execute(std::move(_task)); diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index b59219fc8c9..ccd480a5a91 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -4,11 +4,14 @@ #include "operationdonecontext.h" #include <vespa/vespalib/util/executor.h> +#include <vespa/document/base/globalid.h> +#include <vespa/searchlib/common/serialnum.h> namespace proton { class IDocumentMetaStore; +class IGidToLidChangeHandler; /** @@ -23,6 +26,9 @@ class RemoveDoneContext : public OperationDoneContext { vespalib::Executor &_executor; std::unique_ptr<vespalib::Executor::Task> _task; + IGidToLidChangeHandler &_gidToLidChangeHandler; + document::GlobalId _gid; + search::SerialNum _serialNum; public: RemoveDoneContext(std::unique_ptr<FeedToken> token, @@ -30,7 +36,10 @@ public: PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - uint32_t lid); + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum); virtual ~RemoveDoneContext(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 62e01db28e0..749078c56ba 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -27,6 +27,7 @@ using document::DocumentUpdate; using search::index::Schema; using search::makeLambdaTask; using search::IDestructorCallback; +using search::SerialNum; using storage::spi::BucketInfoResult; using storage::spi::Timestamp; using vespalib::IllegalStateException; @@ -57,32 +58,37 @@ private: public: PutDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, IDestructorCallback::SP moveDoneCtx) - : PutDoneContext(std::move(token), opType, metrics), + PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, search::SerialNum serialNum, bool changedDbdId, IDestructorCallback::SP moveDoneCtx) + : PutDoneContext(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, changedDbdId), _moveDoneCtx(std::move(moveDoneCtx)) {} virtual ~PutDoneContextForMove() {} }; std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, bool force, +createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, + SerialNum serialNum, bool changedDbdId, IDestructorCallback::SP moveDoneCtx) { std::shared_ptr<PutDoneContext> result; - if (token || force) { - if (moveDoneCtx) { - result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, std::move(moveDoneCtx)); - } else { - result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics); - } + if (moveDoneCtx) { + result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, changedDbdId, std::move(moveDoneCtx)); + } else { + result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, changedDbdId); } return result; } std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, bool force) +createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool changedDbdId) { - return createPutDoneContext(token, opType, metrics, force, IDestructorCallback::SP()); + return createPutDoneContext(token, opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, changedDbdId, IDestructorCallback::SP()); } std::shared_ptr<UpdateDoneContext> @@ -109,9 +115,13 @@ private: public: RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, uint32_t lid, + IDocumentMetaStore &documentMetaStore, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + SerialNum serialNum, IDestructorCallback::SP moveDoneCtx) - : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, lid), + : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum), _moveDoneCtx(std::move(moveDoneCtx)) {} virtual ~RemoveDoneContextForMove() {} @@ -120,15 +130,19 @@ public: std::shared_ptr<RemoveDoneContext> createRemoveDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, uint32_t lid, + IDocumentMetaStore &documentMetaStore, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + SerialNum serialNum, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), opType, metrics, executor, documentMetaStore, lid, std::move(moveDoneCtx)); + (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> - (std::move(token), opType, metrics, executor, documentMetaStore, lid); + (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum); } } @@ -281,7 +295,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) putOp.getSubDbId(), putOp.getLid(), putOp.getPrevSubDbId(), putOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - uint32_t oldDocIdLimit = _metaStore.getCommittedDocIdLimit(); adjustMetaStore(putOp, docId); considerEarlyAck(token, putOp.getType()); @@ -289,16 +302,18 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) if (putOp.getValidDbdId(_params._subDbId)) { bool immediateCommit = _commitTimeTracker.needCommit(); + const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, putOp.getType(), _params._metrics, - immediateCommit && putOp.getLid() >= oldDocIdLimit); + _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, putOp.changedDbdId()); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && putOp.changedDbdId()) { assert(!putOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), serialNum, putOp.getPrevLid(), putOp.getType(), IDestructorCallback::SP()); + const document::GlobalId &gid = docId.getGlobalId(); + internalRemove(std::move(token), serialNum, gid, putOp.getPrevLid(), putOp.getType(), IDestructorCallback::SP()); } if (token.get() != NULL) { token->ack(putOp.getType(), _params._metrics); @@ -575,7 +590,8 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), serialNum, rmOp.getPrevLid(), rmOp.getType(), IDestructorCallback::SP()); + const document::GlobalId &gid = docId.getGlobalId(); + internalRemove(std::move(token), serialNum, gid, rmOp.getPrevLid(), rmOp.getType(), IDestructorCallback::SP()); } } if (token.get() != NULL) { @@ -584,7 +600,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, Lid lid, +StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid, FeedOperation::Type opType, IDestructorCallback::SP moveDoneCtx) { removeSummary(serialNum, lid); @@ -592,10 +608,10 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, Lid std::shared_ptr<RemoveDoneContext> onWriteDone; if (explicitReuseLid || token) { onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(), - _metaStore, (explicitReuseLid ? lid : 0u), moveDoneCtx); + _metaStore, _gidToLidChangeHandler, gid, (explicitReuseLid ? lid : 0u), serialNum, moveDoneCtx); } else if (moveDoneCtx) { onWriteDone = createRemoveDoneContext(FeedToken::UP(), opType, _params._metrics, _writeService.master(), - _metaStore, 0u, moveDoneCtx); + _metaStore, _gidToLidChangeHandler, gid, 0u, serialNum, moveDoneCtx); } bool immediateCommit = _commitTimeTracker.needCommit(); removeAttributes(serialNum, lid, immediateCommit, onWriteDone); @@ -613,17 +629,12 @@ StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const DocumentId op.getLid() != op.getPrevLid()) { moveMetaData(_metaStore, docId, op); - _gidToLidChangeHandler.notifyPut(docId.getGlobalId(), op.getLid(), serialNum); } else { putMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); - if (op.getDbDocumentId() != op.getPrevDbDocumentId()) { - _gidToLidChangeHandler.notifyPut(docId.getGlobalId(), op.getLid(), serialNum); - } } } else if (op.getValidPrevDbdId(_params._subDbId)) { - removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); _gidToLidChangeHandler.notifyRemove(docId.getGlobalId(), serialNum); - _gidToLidChangeHandler.notifyRemoveDone(docId.getGlobalId(), serialNum); + removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); } _metaStore.commit(serialNum, serialNum); } @@ -652,12 +663,14 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo bool explicitReuseLids = false; if (useDMS) { std::vector<document::GlobalId> gidsToRemove(getGidsToRemove(_metaStore, lidsToRemove)); - _metaStore.removeBatch(lidsToRemove, ctx->getDocIdLimit()); for (const auto &gid : gidsToRemove) { _gidToLidChangeHandler.notifyRemove(gid, serialNum); - _gidToLidChangeHandler.notifyRemoveDone(gid, serialNum); } + _metaStore.removeBatch(lidsToRemove, ctx->getDocIdLimit()); _metaStore.commit(serialNum, serialNum); + for (const auto &gid : gidsToRemove) { + _gidToLidChangeHandler.notifyRemoveDone(gid, serialNum); + } explicitReuseLids = _lidReuseDelayer.delayReuse(lidsToRemove); } std::shared_ptr<search::IDestructorCallback> onWriteDone; @@ -741,26 +754,28 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: moveOp.getSubDbId(), moveOp.getLid(), moveOp.getPrevSubDbId(), moveOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - uint32_t oldDocIdLimit = _metaStore.getCommittedDocIdLimit(); adjustMetaStore(moveOp, docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { bool immediateCommit = _commitTimeTracker.needCommit(); + const document::GlobalId &gid = docId.getGlobalId(); FeedToken::UP token; std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, moveOp.getType(), _params._metrics, - immediateCommit && (moveOp.getLid() >= oldDocIdLimit), doneCtx); + _gidToLidChangeHandler, gid, moveOp.getLid(), + serialNum, moveOp.changedDbdId(), doneCtx); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(FeedToken::UP(), serialNum, moveOp.getPrevLid(), moveOp.getType(), doneCtx); + const document::GlobalId &gid = docId.getGlobalId(); + internalRemove(FeedToken::UP(), serialNum, gid, moveOp.getPrevLid(), moveOp.getType(), doneCtx); } } void -StoreOnlyFeedView::heartBeat(search::SerialNum serialNum) +StoreOnlyFeedView::heartBeat(SerialNum serialNum) { assert(_writeService.master().isCurrentThread()); _metaStore.removeAllOldGenerations(); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 1302ef7b632..3b3a5efbe24 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -22,6 +22,8 @@ namespace search { class IDestructorCallback; } +namespace document { class GLobalId; } + namespace proton { class IReplayConfig; @@ -178,7 +180,7 @@ private: size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, bool immediateCommit); - void internalRemove(FeedTokenUP token, SerialNum serialNum, Lid lid, + void internalRemove(FeedTokenUP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid, FeedOperation::Type opType, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); // Ack token early if visibility delay is nonzero |