diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-07 14:24:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-07 14:24:38 +0200 |
commit | 96c7b6bb6930a2340680d055a0c8b2e11bc69d81 (patch) | |
tree | d22a557e63f8d5fcf3dc93c81d69a21ca1058dbb | |
parent | 9c8162a9f042c5cf6bebe6771a74c851d0f419ad (diff) | |
parent | eccf4dd0680b829869ce9e2e9920ad9339c0d37c (diff) |
Merge pull request #3361 from vespa-engine/toregge/delay-put-and-removedone-notifications
Delay notifyPut and notifyRemoveDone
11 files changed, 190 insertions, 62 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 70adc7e47b6..c5b0cd70527 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -77,6 +77,7 @@ vespa_add_library(searchcore_server STATIC pruneremoveddocumentsjob.cpp putdonecontext.cpp reconfig_params.cpp + remove_batch_done_context.cpp removedonecontext.cpp removedonetask.cpp replaypacketdispatcher.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp index 2444223c96f..e46caca4fba 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp @@ -44,7 +44,7 @@ FastAccessFeedView::putAttributes(SerialNum serialNum, { _attributeWriter->put(serialNum, doc, lid, immediateCommit, onWriteDone); if (immediateCommit && onWriteDone) { - onWriteDone->registerPutLid(lid, &_docIdLimit); + onWriteDone->registerPutLid(&_docIdLimit); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index ad50e3a92aa..f05d8bc0823 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -3,15 +3,25 @@ #include "putdonecontext.h" #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/common/docid_limit.h> +#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics) + PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum, + bool enableNotifyPut) : OperationDoneContext(std::move(token), opType, metrics), - _lid(0), - _docIdLimit(nullptr) + _lid(lid), + _docIdLimit(nullptr), + _gidToLidChangeHandler(gidToLidChangeHandler), + _gid(gid), + _serialNum(serialNum), + _enableNotifyPut(enableNotifyPut) { } @@ -20,6 +30,9 @@ PutDoneContext::~PutDoneContext() if (_docIdLimit != nullptr) { _docIdLimit->bumpUpLimit(_lid + 1); } + if (_enableNotifyPut) { + _gidToLidChangeHandler.notifyPut(_gid, _lid, _serialNum); + } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index ba94891fd6e..bddf9dabd90 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -3,12 +3,15 @@ #pragma once #include "operationdonecontext.h" +#include <vespa/document/base/globalid.h> +#include <vespa/searchlib/common/serialnum.h> namespace proton { class DocIdLimit; +class IGidToLidChangeHandler; /** * Context class for document put operations that acks operation when @@ -21,17 +24,25 @@ class PutDoneContext : public OperationDoneContext { uint32_t _lid; DocIdLimit *_docIdLimit; + IGidToLidChangeHandler &_gidToLidChangeHandler; + document::GlobalId _gid; + search::SerialNum _serialNum; + bool _enableNotifyPut; public: PutDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics); + PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum, + bool enableNotifyPut); virtual ~PutDoneContext(); - void registerPutLid(uint32_t lid, DocIdLimit *docIdLimit) + void registerPutLid(DocIdLimit *docIdLimit) { - _lid = lid; _docIdLimit = docIdLimit; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp new file mode 100644 index 00000000000..b0ece5f35a1 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp @@ -0,0 +1,27 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "remove_batch_done_context.h" +#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> + +namespace proton { + +RemoveBatchDoneContext::RemoveBatchDoneContext(vespalib::Executor &executor, + vespalib::Executor::Task::UP task, + IGidToLidChangeHandler &gidToLidChangeHandler, + std::vector<document::GlobalId> gidsToRemove, + search::SerialNum serialNum) + : search::ScheduleTaskCallback(executor, std::move(task)), + _gidToLidChangeHandler(gidToLidChangeHandler), + _gidsToRemove(std::move(gidsToRemove)), + _serialNum(serialNum) +{ +} + +RemoveBatchDoneContext::~RemoveBatchDoneContext() +{ + for (const auto &gid : _gidsToRemove) { + _gidToLidChangeHandler.notifyRemoveDone(gid, _serialNum); + } +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h new file mode 100644 index 00000000000..2a93239574a --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h @@ -0,0 +1,38 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchlib/common/scheduletaskcallback.h> +#include <vespa/document/base/globalid.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vector> + +namespace proton +{ + +class IGidToLidChangeHandler; + +/** + * Context class for document batch remove that notifies gid to lid + * change handler about each remove done and schedules a + * task when instance is destroyed. Typically a shared pointer to an + * instance is passed around to multiple worker threads that performs + * portions of a larger task before dropping the shared pointer. + */ +class RemoveBatchDoneContext : public search::ScheduleTaskCallback +{ + IGidToLidChangeHandler &_gidToLidChangeHandler; + std::vector<document::GlobalId> _gidsToRemove; + search::SerialNum _serialNum; + +public: + RemoveBatchDoneContext(vespalib::Executor &executor, + vespalib::Executor::Task::UP task, + IGidToLidChangeHandler &gidToLidChangeHandler, + std::vector<document::GlobalId> gidsToRemove, + search::SerialNum serialNum); + + virtual ~RemoveBatchDoneContext(); +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index 219d7482853..522b0aed617 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -3,6 +3,7 @@ #include "removedonecontext.h" #include "removedonetask.h" #include <vespa/searchcore/proton/common/feedtoken.h> +#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> namespace proton { @@ -11,10 +12,18 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - uint32_t lid) + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum, + bool enableNotifyRemoveDone) : OperationDoneContext(std::move(token), opType, metrics), _executor(executor), - _task() + _task(), + _gidToLidChangeHandler(gidToLidChangeHandler), + _gid(gid), + _serialNum(serialNum), + _enableNotifyRemoveDone(enableNotifyRemoveDone) { if (lid != 0) { _task = std::make_unique<RemoveDoneTask>(documentMetaStore, lid); @@ -23,6 +32,9 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token, RemoveDoneContext::~RemoveDoneContext() { + if (_enableNotifyRemoveDone) { + _gidToLidChangeHandler.notifyRemoveDone(_gid, _serialNum); + } ack(); if (_task) { vespalib::Executor::Task::UP res = _executor.execute(std::move(_task)); diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index b59219fc8c9..9311a6d2b6e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -4,11 +4,14 @@ #include "operationdonecontext.h" #include <vespa/vespalib/util/executor.h> +#include <vespa/document/base/globalid.h> +#include <vespa/searchlib/common/serialnum.h> namespace proton { class IDocumentMetaStore; +class IGidToLidChangeHandler; /** @@ -23,6 +26,10 @@ class RemoveDoneContext : public OperationDoneContext { vespalib::Executor &_executor; std::unique_ptr<vespalib::Executor::Task> _task; + IGidToLidChangeHandler &_gidToLidChangeHandler; + document::GlobalId _gid; + search::SerialNum _serialNum; + bool _enableNotifyRemoveDone; public: RemoveDoneContext(std::unique_ptr<FeedToken> token, @@ -30,7 +37,11 @@ public: PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, IDocumentMetaStore &documentMetaStore, - uint32_t lid); + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + search::SerialNum serialNum, + bool enableNotifyRemoveDone); virtual ~RemoveDoneContext(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 62e01db28e0..5f0fc3b8aa6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -7,6 +7,7 @@ #include "removedonecontext.h" #include "storeonlyfeedview.h" #include "updatedonecontext.h" +#include "remove_batch_done_context.h" #include <vespa/document/datatype/documenttype.h> #include <vespa/searchcore/proton/common/commit_time_tracker.h> #include <vespa/searchcore/proton/common/feedtoken.h> @@ -27,6 +28,7 @@ using document::DocumentUpdate; using search::index::Schema; using search::makeLambdaTask; using search::IDestructorCallback; +using search::SerialNum; using storage::spi::BucketInfoResult; using storage::spi::Timestamp; using vespalib::IllegalStateException; @@ -57,32 +59,37 @@ private: public: PutDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, - PerDocTypeFeedMetrics &metrics, IDestructorCallback::SP moveDoneCtx) - : PutDoneContext(std::move(token), opType, metrics), + PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) + : PutDoneContext(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut), _moveDoneCtx(std::move(moveDoneCtx)) {} virtual ~PutDoneContextForMove() {} }; std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, bool force, +createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, + SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx) { std::shared_ptr<PutDoneContext> result; - if (token || force) { - if (moveDoneCtx) { - result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, std::move(moveDoneCtx)); - } else { - result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics); - } + if (moveDoneCtx) { + result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, std::move(moveDoneCtx)); + } else { + result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut); } return result; } std::shared_ptr<PutDoneContext> -createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, bool force) +createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut) { - return createPutDoneContext(token, opType, metrics, force, IDestructorCallback::SP()); + return createPutDoneContext(token, opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP()); } std::shared_ptr<UpdateDoneContext> @@ -109,9 +116,14 @@ private: public: RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, uint32_t lid, + IDocumentMetaStore &documentMetaStore, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + SerialNum serialNum, + bool enableNotifyRemoveDone, IDestructorCallback::SP moveDoneCtx) - : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, lid), + : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone), _moveDoneCtx(std::move(moveDoneCtx)) {} virtual ~RemoveDoneContextForMove() {} @@ -120,15 +132,20 @@ public: std::shared_ptr<RemoveDoneContext> createRemoveDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, uint32_t lid, + IDocumentMetaStore &documentMetaStore, + IGidToLidChangeHandler &gidToLidChangeHandler, + const document::GlobalId &gid, + uint32_t lid, + SerialNum serialNum, + bool enableNotifyRemoveDone, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), opType, metrics, executor, documentMetaStore, lid, std::move(moveDoneCtx)); + (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> - (std::move(token), opType, metrics, executor, documentMetaStore, lid); + (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone); } } @@ -281,7 +298,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) putOp.getSubDbId(), putOp.getLid(), putOp.getPrevSubDbId(), putOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - uint32_t oldDocIdLimit = _metaStore.getCommittedDocIdLimit(); adjustMetaStore(putOp, docId); considerEarlyAck(token, putOp.getType()); @@ -289,16 +305,18 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) if (putOp.getValidDbdId(_params._subDbId)) { bool immediateCommit = _commitTimeTracker.needCommit(); + const document::GlobalId &gid = docId.getGlobalId(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, putOp.getType(), _params._metrics, - immediateCommit && putOp.getLid() >= oldDocIdLimit); + _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, putOp.changedDbdId() && useDocumentMetaStore(serialNum)); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && putOp.changedDbdId()) { assert(!putOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), serialNum, putOp.getPrevLid(), putOp.getType(), IDestructorCallback::SP()); + const document::GlobalId &gid = docId.getGlobalId(); + internalRemove(std::move(token), serialNum, gid, putOp.getPrevLid(), putOp.getType(), useDocumentMetaStore(serialNum), IDestructorCallback::SP()); } if (token.get() != NULL) { token->ack(putOp.getType(), _params._metrics); @@ -575,7 +593,8 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), serialNum, rmOp.getPrevLid(), rmOp.getType(), IDestructorCallback::SP()); + const document::GlobalId &gid = docId.getGlobalId(); + internalRemove(std::move(token), serialNum, gid, rmOp.getPrevLid(), rmOp.getType(), useDocumentMetaStore(serialNum), IDestructorCallback::SP()); } } if (token.get() != NULL) { @@ -584,19 +603,14 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm } void -StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, Lid lid, - FeedOperation::Type opType, IDestructorCallback::SP moveDoneCtx) +StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid, + FeedOperation::Type opType, bool enableNotifyRemoveDone, IDestructorCallback::SP moveDoneCtx) { removeSummary(serialNum, lid); bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid); std::shared_ptr<RemoveDoneContext> onWriteDone; - if (explicitReuseLid || token) { - onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(), - _metaStore, (explicitReuseLid ? lid : 0u), moveDoneCtx); - } else if (moveDoneCtx) { - onWriteDone = createRemoveDoneContext(FeedToken::UP(), opType, _params._metrics, _writeService.master(), - _metaStore, 0u, moveDoneCtx); - } + onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(), + _metaStore, _gidToLidChangeHandler, gid, (explicitReuseLid ? lid : 0u), serialNum, enableNotifyRemoveDone, moveDoneCtx); bool immediateCommit = _commitTimeTracker.needCommit(); removeAttributes(serialNum, lid, immediateCommit, onWriteDone); removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone); @@ -613,17 +627,12 @@ StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const DocumentId op.getLid() != op.getPrevLid()) { moveMetaData(_metaStore, docId, op); - _gidToLidChangeHandler.notifyPut(docId.getGlobalId(), op.getLid(), serialNum); } else { putMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); - if (op.getDbDocumentId() != op.getPrevDbDocumentId()) { - _gidToLidChangeHandler.notifyPut(docId.getGlobalId(), op.getLid(), serialNum); - } } } else if (op.getValidPrevDbdId(_params._subDbId)) { - removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); _gidToLidChangeHandler.notifyRemove(docId.getGlobalId(), serialNum); - _gidToLidChangeHandler.notifyRemoveDone(docId.getGlobalId(), serialNum); + removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED); } _metaStore.commit(serialNum, serialNum); } @@ -650,23 +659,26 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo const LidVector &lidsToRemove(ctx->getLidVector()); bool useDMS = useDocumentMetaStore(serialNum); bool explicitReuseLids = false; + std::vector<document::GlobalId> gidsToRemove; if (useDMS) { - std::vector<document::GlobalId> gidsToRemove(getGidsToRemove(_metaStore, lidsToRemove)); - _metaStore.removeBatch(lidsToRemove, ctx->getDocIdLimit()); + gidsToRemove = getGidsToRemove(_metaStore, lidsToRemove); for (const auto &gid : gidsToRemove) { _gidToLidChangeHandler.notifyRemove(gid, serialNum); - _gidToLidChangeHandler.notifyRemoveDone(gid, serialNum); } + _metaStore.removeBatch(lidsToRemove, ctx->getDocIdLimit()); _metaStore.commit(serialNum, serialNum); explicitReuseLids = _lidReuseDelayer.delayReuse(lidsToRemove); } std::shared_ptr<search::IDestructorCallback> onWriteDone; + vespalib::Executor::Task::UP removeBatchDoneTask; + if (explicitReuseLids) { + removeBatchDoneTask = makeLambdaTask([=]() { _metaStore.removeBatchComplete(lidsToRemove); }); + } else { + removeBatchDoneTask = makeLambdaTask([]() {}); + } + onWriteDone = std::make_shared<RemoveBatchDoneContext>(_writeService.master(), std::move(removeBatchDoneTask), + _gidToLidChangeHandler, std::move(gidsToRemove), serialNum); if (remove_index_and_attributes) { - if (explicitReuseLids) { - onWriteDone = std::make_shared<search::ScheduleTaskCallback>( - _writeService.master(), - makeLambdaTask([=]() { _metaStore.removeBatchComplete(lidsToRemove); })); - } removeIndexedFields(serialNum, lidsToRemove, immediateCommit, onWriteDone); removeAttributes(serialNum, lidsToRemove, immediateCommit, onWriteDone); } @@ -675,9 +687,6 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo removeSummary(serialNum, lid); } } - if (explicitReuseLids && !onWriteDone) { - _metaStore.removeBatchComplete(lidsToRemove); - } return lidsToRemove.size(); } @@ -741,26 +750,29 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: moveOp.getSubDbId(), moveOp.getLid(), moveOp.getPrevSubDbId(), moveOp.getPrevLid(), _params._subDbId, doc->toString(true).size(), doc->toString(true).c_str()); - uint32_t oldDocIdLimit = _metaStore.getCommittedDocIdLimit(); adjustMetaStore(moveOp, docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { bool immediateCommit = _commitTimeTracker.needCommit(); + const document::GlobalId &gid = docId.getGlobalId(); FeedToken::UP token; std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, moveOp.getType(), _params._metrics, - immediateCommit && (moveOp.getLid() >= oldDocIdLimit), doneCtx); + _gidToLidChangeHandler, gid, moveOp.getLid(), + serialNum, moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(FeedToken::UP(), serialNum, moveOp.getPrevLid(), moveOp.getType(), doneCtx); + const document::GlobalId &gid = docId.getGlobalId(); + bool enableNotifyRemoveDone = useDocumentMetaStore(serialNum) && !moveOp.getValidDbdId(_params._subDbId); + internalRemove(FeedToken::UP(), serialNum, gid, moveOp.getPrevLid(), moveOp.getType(), enableNotifyRemoveDone, doneCtx); } } void -StoreOnlyFeedView::heartBeat(search::SerialNum serialNum) +StoreOnlyFeedView::heartBeat(SerialNum serialNum) { assert(_writeService.master().isCurrentThread()); _metaStore.removeAllOldGenerations(); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 1302ef7b632..01a8122ed1e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -22,6 +22,8 @@ namespace search { class IDestructorCallback; } +namespace document { class GLobalId; } + namespace proton { class IReplayConfig; @@ -178,8 +180,8 @@ private: size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, bool immediateCommit); - void internalRemove(FeedTokenUP token, SerialNum serialNum, Lid lid, - FeedOperation::Type opType, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); + void internalRemove(FeedTokenUP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid, + FeedOperation::Type opType, bool enableNotifyRemoveDone, std::shared_ptr<search::IDestructorCallback> moveDoneCtx); // Ack token early if visibility delay is nonzero void considerEarlyAck(FeedTokenUP &token, FeedOperation::Type opType); diff --git a/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h b/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h index 4e12fb9d1b8..27bbe751532 100644 --- a/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h +++ b/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h @@ -2,6 +2,7 @@ #pragma once #include "idestructorcallback.h" +#include <vespa/vespalib/util/executor.h> namespace search { |