diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-08-04 22:06:33 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-08-04 22:06:33 +0200 |
commit | 9c0310165ac26d8151a0a58d32bcc92260c8b422 (patch) | |
tree | 787d4b4febd6a6310832cba5d1f16d06270eb6b4 /searchcore | |
parent | 89b3dac6831e9c48d031029b34d1c83831c96239 (diff) |
Ensure ack is delayed until visibility is achieved.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp | 22 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h | 4 |
2 files changed, 16 insertions, 10 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 1998fe806cc..b0fbee46320 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -288,11 +288,11 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp) bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId); if (putOp.getValidDbdId(_params._subDbId)) { - putSummary(serialNum, putOp.getLid(), doc); bool immediateCommit = _commitTimeTracker.needCommit(); std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, putOp.getType(), _params._metrics, immediateCommit && putOp.getLid() >= oldDocIdLimit); + putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone); } @@ -350,10 +350,13 @@ StoreOnlyFeedView::handleUpdate(FeedToken *token, const UpdateOperation &updOp) internalUpdate(dupFeedToken(token), updOp); } -void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc futureDoc) { +void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, + FutureDoc futureDoc, OnOperationDoneType onDone) +{ _pendingLidTracker.produce(lid); summaryExecutor().execute( - makeLambdaTask([serialNum, futureDoc, lid, this] { + makeLambdaTask([serialNum, futureDoc = std::move(futureDoc), lid, onDone, this] { + (void) onDone; const Document::UP & doc = futureDoc.get(); if (doc) { _summaryAdapter->put(serialNum, *futureDoc.get(), lid); @@ -361,10 +364,13 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid _pendingLidTracker.consume(lid); })); } -void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc) { +void StoreOnlyFeedView::putSummary(SerialNum serialNum, search::DocumentIdT lid, + Document::SP doc, OnOperationDoneType onDone) +{ _pendingLidTracker.produce(lid); summaryExecutor().execute( - makeLambdaTask([serialNum, doc = std::move(doc), lid, this] { + makeLambdaTask([serialNum, doc = std::move(doc), onDone, lid, this] { + (void) onDone; _summaryAdapter->put(serialNum, *doc, lid); _pendingLidTracker.consume(lid); })); @@ -427,7 +433,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up updateIndexedFields(serialNum, lid, futureDoc, immediateCommit, onWriteDone); } if (useDocumentStore(serialNum)) { - putSummary(serialNum, lid, futureDoc); + putSummary(serialNum, lid, futureDoc, onWriteDone); } Document::UP prevDoc(_summaryAdapter->get(lid, *_repo)); @@ -549,7 +555,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm Document::UP clearDoc(new Document(*_docType, docId)); clearDoc->setRepo(*_repo); - putSummary(serialNum, rmOp.getLid(), std::move(clearDoc)); + putSummary(serialNum, rmOp.getLid(), std::move(clearDoc), std::shared_ptr<OperationDoneContext>()); } if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { @@ -722,12 +728,12 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: adjustMetaStore(moveOp, docId); bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId); if (moveOp.getValidDbdId(_params._subDbId)) { - putSummary(serialNum, moveOp.getLid(), doc); bool immediateCommit = _commitTimeTracker.needCommit(); FeedToken::UP token; std::shared_ptr<PutDoneContext> onWriteDone = createPutDoneContext(token, moveOp.getType(), _params._metrics, immediateCommit && (moveOp.getLid() >= oldDocIdLimit), doneCtx); + putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone); } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index e12a13d3f8a..e0a871c9cc6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -145,8 +145,8 @@ private: searchcorespi::index::IThreadService & summaryExecutor() { return _writeService.summary(); } - void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc); - void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc); + void putSummary(SerialNum serialNum, search::DocumentIdT lid, FutureDoc doc, OnOperationDoneType onDone); + void putSummary(SerialNum serialNum, search::DocumentIdT lid, Document::SP doc, OnOperationDoneType onDone); void removeSummary(SerialNum serialNum, search::DocumentIdT lid); void heartBeatSummary(SerialNum serialNum); |