diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-28 17:02:23 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-28 17:03:18 +0000 |
commit | dbeab9300477aa5db0362d2a6424d1a2b148253b (patch) | |
tree | a3246b077bc238a5319d0b072d7ec4ea1e3954e1 /searchcore | |
parent | 269217e637cb87b797e35b80d0e7017ff47bd7f2 (diff) |
Ensure that we post a dummy task that will wait for makeUpdatedDocument to complete.
Diffstat (limited to 'searchcore')
3 files changed, 36 insertions, 17 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index e4d2d3b3f16..bd7e57dc56e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -779,9 +779,7 @@ FeedHandler::sync() FeedHandler::RPC::Result FeedHandler::receive(const Packet &packet) { - // Called directly when replaying transaction log - // (by fnet thread). Called via DocumentDB::recoverPacket() when - // recovering from another node. + // Called directly when replaying transaction log (by fnet thread). FeedStateSP state = getFeedState(); auto wrap = make_shared<PacketWrapper>(packet, _tlsReplayProgress.get()); state->receive(wrap, _writeService.master()); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 552d2b84179..6066fef68d8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -369,8 +369,9 @@ StoreOnlyFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp) internalUpdate(std::move(token), updOp); } -void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, - FutureStream futureStream, OnOperationDoneType onDone) +void +StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, + FutureStream futureStream, OnOperationDoneType onDone) { summaryExecutor().execute( makeLambdaTask([serialNum, lid, futureStream = std::move(futureStream), trackerToken = _pendingLidsForDocStore.produce(lid), onDone, this] () mutable { @@ -383,7 +384,20 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, })); } -void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP doc, OnOperationDoneType onDone) +void +StoreOnlyFeedView::putSummaryNoop(SerialNum serialNum, Lid lid, + FutureStream futureStream, OnOperationDoneType onDone) +{ + summaryExecutor().execute( + makeLambdaTask([serialNum, lid, futureStream = std::move(futureStream), onDone] () mutable { + (void) onDone; + vespalib::nbostream os = futureStream.get(); + (void) os; + })); +} + +void +StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP doc, OnOperationDoneType onDone) { summaryExecutor().execute( makeLambdaTask([serialNum, doc = std::move(doc), trackerToken = _pendingLidsForDocStore.produce(lid), onDone, lid, this] { @@ -392,7 +406,8 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP do _summaryAdapter->put(serialNum, lid, *doc); })); } -void StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone) { +void +StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone) { summaryExecutor().execute( makeLambdaTask([serialNum, lid, onDone, trackerToken = _pendingLidsForDocStore.produce(lid), this] { (void) onDone; @@ -400,7 +415,9 @@ void StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneT _summaryAdapter->remove(serialNum, lid); })); } -void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) { + +void +StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) { summaryExecutor().execute( makeLambdaTask([serialNum, this] { _summaryAdapter->heartBeat(serialNum); @@ -467,14 +484,17 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) } PromisedStream promisedStream; FutureStream futureStream = promisedStream.get_future(); - if (useDocumentStore(serialNum)) { + bool useDocStore = useDocumentStore(serialNum); + if (useDocStore) { putSummary(serialNum, lid, std::move(futureStream), onWriteDone); + } else { + putSummaryNoop(serialNum, lid, std::move(futureStream), onWriteDone); } _writeService.shared().execute(makeLambdaTask( - [upd = updOp.getUpdate(), serialNum, lid, onWriteDone, promisedDoc = std::move(promisedDoc), + [upd = updOp.getUpdate(), useDocStore, lid, onWriteDone, promisedDoc = std::move(promisedDoc), promisedStream = std::move(promisedStream), this]() mutable { - makeUpdatedDocument(serialNum, lid, *upd, onWriteDone, + makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone, std::move(promisedDoc), std::move(promisedStream)); })); updateAttributes(serialNum, lid, std::move(futureDoc), onWriteDone); @@ -482,15 +502,15 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) } void -StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, const DocumentUpdate & update, +StoreOnlyFeedView::makeUpdatedDocument(bool useDocStore, Lid lid, const DocumentUpdate & update, OnOperationDoneType onWriteDone, PromisedDoc promisedDoc, PromisedStream promisedStream) { Document::UP prevDoc = _summaryAdapter->get(lid, *_repo); Document::UP newDoc; vespalib::nbostream newStream(12345); - assert(!onWriteDone->hasToken() || useDocumentStore(serialNum)); - if (useDocumentStore(serialNum)) { + assert(!onWriteDone->hasToken() || useDocStore); + if (useDocStore) { assert(prevDoc); } if (!prevDoc) { @@ -502,14 +522,14 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, const Docum } else { if (update.getId() == prevDoc->getId()) { newDoc = std::move(prevDoc); - if (useDocumentStore(serialNum)) { + if (useDocStore) { update.applyTo(*newDoc); newDoc->serialize(newStream); } } else { // Replaying, document removed and lid reused before summary // was flushed. - assert(!onWriteDone->hasToken() && !useDocumentStore(serialNum)); + assert(!onWriteDone->hasToken() && !useDocStore); } } promisedDoc.set_value(std::move(newDoc)); diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index c8d23b1ab58..7f1876cbbdf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -156,6 +156,7 @@ private: return _writeService.summary(); } void putSummary(SerialNum serialNum, Lid lid, FutureStream doc, OnOperationDoneType onDone); + void putSummaryNoop(SerialNum serialNum, Lid lid, FutureStream doc, OnOperationDoneType onDone); void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone); void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone); void heartBeatSummary(SerialNum serialNum); @@ -184,7 +185,7 @@ private: IPendingLidTracker::Token get_pending_lid_token(const DocumentOperation &op); - void makeUpdatedDocument(SerialNum serialNum, Lid lid, const DocumentUpdate & update, OnOperationDoneType onWriteDone, + void makeUpdatedDocument(bool useDocStore, Lid lid, const DocumentUpdate & update, OnOperationDoneType onWriteDone, PromisedDoc promisedDoc, PromisedStream promisedStream); protected: |