aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-04-28 17:02:23 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-04-28 17:03:18 +0000
commitdbeab9300477aa5db0362d2a6424d1a2b148253b (patch)
treea3246b077bc238a5319d0b072d7ec4ea1e3954e1 /searchcore
parent269217e637cb87b797e35b80d0e7017ff47bd7f2 (diff)
Ensure that we post a dummy task that will wait for makeUpdatedDocument to complete.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp46
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h3
3 files changed, 36 insertions, 17 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index e4d2d3b3f16..bd7e57dc56e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -779,9 +779,7 @@ FeedHandler::sync()
FeedHandler::RPC::Result
FeedHandler::receive(const Packet &packet)
{
- // Called directly when replaying transaction log
- // (by fnet thread). Called via DocumentDB::recoverPacket() when
- // recovering from another node.
+ // Called directly when replaying transaction log (by fnet thread).
FeedStateSP state = getFeedState();
auto wrap = make_shared<PacketWrapper>(packet, _tlsReplayProgress.get());
state->receive(wrap, _writeService.master());
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 552d2b84179..6066fef68d8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -369,8 +369,9 @@ StoreOnlyFeedView::handleUpdate(FeedToken token, const UpdateOperation &updOp)
internalUpdate(std::move(token), updOp);
}
-void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid,
- FutureStream futureStream, OnOperationDoneType onDone)
+void
+StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid,
+ FutureStream futureStream, OnOperationDoneType onDone)
{
summaryExecutor().execute(
makeLambdaTask([serialNum, lid, futureStream = std::move(futureStream), trackerToken = _pendingLidsForDocStore.produce(lid), onDone, this] () mutable {
@@ -383,7 +384,20 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid,
}));
}
-void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP doc, OnOperationDoneType onDone)
+void
+StoreOnlyFeedView::putSummaryNoop(SerialNum serialNum, Lid lid,
+ FutureStream futureStream, OnOperationDoneType onDone)
+{
+ summaryExecutor().execute(
+ makeLambdaTask([serialNum, lid, futureStream = std::move(futureStream), onDone] () mutable {
+ (void) onDone;
+ vespalib::nbostream os = futureStream.get();
+ (void) os;
+ }));
+}
+
+void
+StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP doc, OnOperationDoneType onDone)
{
summaryExecutor().execute(
makeLambdaTask([serialNum, doc = std::move(doc), trackerToken = _pendingLidsForDocStore.produce(lid), onDone, lid, this] {
@@ -392,7 +406,8 @@ void StoreOnlyFeedView::putSummary(SerialNum serialNum, Lid lid, Document::SP do
_summaryAdapter->put(serialNum, lid, *doc);
}));
}
-void StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone) {
+void
+StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone) {
summaryExecutor().execute(
makeLambdaTask([serialNum, lid, onDone, trackerToken = _pendingLidsForDocStore.produce(lid), this] {
(void) onDone;
@@ -400,7 +415,9 @@ void StoreOnlyFeedView::removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneT
_summaryAdapter->remove(serialNum, lid);
}));
}
-void StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) {
+
+void
+StoreOnlyFeedView::heartBeatSummary(SerialNum serialNum) {
summaryExecutor().execute(
makeLambdaTask([serialNum, this] {
_summaryAdapter->heartBeat(serialNum);
@@ -467,14 +484,17 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
}
PromisedStream promisedStream;
FutureStream futureStream = promisedStream.get_future();
- if (useDocumentStore(serialNum)) {
+ bool useDocStore = useDocumentStore(serialNum);
+ if (useDocStore) {
putSummary(serialNum, lid, std::move(futureStream), onWriteDone);
+ } else {
+ putSummaryNoop(serialNum, lid, std::move(futureStream), onWriteDone);
}
_writeService.shared().execute(makeLambdaTask(
- [upd = updOp.getUpdate(), serialNum, lid, onWriteDone, promisedDoc = std::move(promisedDoc),
+ [upd = updOp.getUpdate(), useDocStore, lid, onWriteDone, promisedDoc = std::move(promisedDoc),
promisedStream = std::move(promisedStream), this]() mutable
{
- makeUpdatedDocument(serialNum, lid, *upd, onWriteDone,
+ makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone,
std::move(promisedDoc), std::move(promisedStream));
}));
updateAttributes(serialNum, lid, std::move(futureDoc), onWriteDone);
@@ -482,15 +502,15 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
}
void
-StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, const DocumentUpdate & update,
+StoreOnlyFeedView::makeUpdatedDocument(bool useDocStore, Lid lid, const DocumentUpdate & update,
OnOperationDoneType onWriteDone, PromisedDoc promisedDoc,
PromisedStream promisedStream)
{
Document::UP prevDoc = _summaryAdapter->get(lid, *_repo);
Document::UP newDoc;
vespalib::nbostream newStream(12345);
- assert(!onWriteDone->hasToken() || useDocumentStore(serialNum));
- if (useDocumentStore(serialNum)) {
+ assert(!onWriteDone->hasToken() || useDocStore);
+ if (useDocStore) {
assert(prevDoc);
}
if (!prevDoc) {
@@ -502,14 +522,14 @@ StoreOnlyFeedView::makeUpdatedDocument(SerialNum serialNum, Lid lid, const Docum
} else {
if (update.getId() == prevDoc->getId()) {
newDoc = std::move(prevDoc);
- if (useDocumentStore(serialNum)) {
+ if (useDocStore) {
update.applyTo(*newDoc);
newDoc->serialize(newStream);
}
} else {
// Replaying, document removed and lid reused before summary
// was flushed.
- assert(!onWriteDone->hasToken() && !useDocumentStore(serialNum));
+ assert(!onWriteDone->hasToken() && !useDocStore);
}
}
promisedDoc.set_value(std::move(newDoc));
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index c8d23b1ab58..7f1876cbbdf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -156,6 +156,7 @@ private:
return _writeService.summary();
}
void putSummary(SerialNum serialNum, Lid lid, FutureStream doc, OnOperationDoneType onDone);
+ void putSummaryNoop(SerialNum serialNum, Lid lid, FutureStream doc, OnOperationDoneType onDone);
void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone);
void heartBeatSummary(SerialNum serialNum);
@@ -184,7 +185,7 @@ private:
IPendingLidTracker::Token get_pending_lid_token(const DocumentOperation &op);
- void makeUpdatedDocument(SerialNum serialNum, Lid lid, const DocumentUpdate & update, OnOperationDoneType onWriteDone,
+ void makeUpdatedDocument(bool useDocStore, Lid lid, const DocumentUpdate & update, OnOperationDoneType onWriteDone,
PromisedDoc promisedDoc, PromisedStream promisedStream);
protected: