summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-07 14:24:38 +0200
committerGitHub <noreply@github.com>2017-09-07 14:24:38 +0200
commit96c7b6bb6930a2340680d055a0c8b2e11bc69d81 (patch)
treed22a557e63f8d5fcf3dc93c81d69a21ca1058dbb
parent9c8162a9f042c5cf6bebe6771a74c851d0f419ad (diff)
parenteccf4dd0680b829869ce9e2e9920ad9339c0d37c (diff)
Merge pull request #3361 from vespa-engine/toregge/delay-put-and-removedone-notifications
Delay notifyPut and notifyRemoveDone
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.h17
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h38
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.h13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp112
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h6
-rw-r--r--searchlib/src/vespa/searchlib/common/scheduletaskcallback.h1
11 files changed, 190 insertions, 62 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 70adc7e47b6..c5b0cd70527 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -77,6 +77,7 @@ vespa_add_library(searchcore_server STATIC
pruneremoveddocumentsjob.cpp
putdonecontext.cpp
reconfig_params.cpp
+ remove_batch_done_context.cpp
removedonecontext.cpp
removedonetask.cpp
replaypacketdispatcher.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
index 2444223c96f..e46caca4fba 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
@@ -44,7 +44,7 @@ FastAccessFeedView::putAttributes(SerialNum serialNum,
{
_attributeWriter->put(serialNum, doc, lid, immediateCommit, onWriteDone);
if (immediateCommit && onWriteDone) {
- onWriteDone->registerPutLid(lid, &_docIdLimit);
+ onWriteDone->registerPutLid(&_docIdLimit);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
index ad50e3a92aa..f05d8bc0823 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
@@ -3,15 +3,25 @@
#include "putdonecontext.h"
#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/common/docid_limit.h>
+#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
namespace proton {
PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token,
const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics)
+ PerDocTypeFeedMetrics &metrics,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid,
+ uint32_t lid,
+ search::SerialNum serialNum,
+ bool enableNotifyPut)
: OperationDoneContext(std::move(token), opType, metrics),
- _lid(0),
- _docIdLimit(nullptr)
+ _lid(lid),
+ _docIdLimit(nullptr),
+ _gidToLidChangeHandler(gidToLidChangeHandler),
+ _gid(gid),
+ _serialNum(serialNum),
+ _enableNotifyPut(enableNotifyPut)
{
}
@@ -20,6 +30,9 @@ PutDoneContext::~PutDoneContext()
if (_docIdLimit != nullptr) {
_docIdLimit->bumpUpLimit(_lid + 1);
}
+ if (_enableNotifyPut) {
+ _gidToLidChangeHandler.notifyPut(_gid, _lid, _serialNum);
+ }
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
index ba94891fd6e..bddf9dabd90 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
@@ -3,12 +3,15 @@
#pragma once
#include "operationdonecontext.h"
+#include <vespa/document/base/globalid.h>
+#include <vespa/searchlib/common/serialnum.h>
namespace proton
{
class DocIdLimit;
+class IGidToLidChangeHandler;
/**
* Context class for document put operations that acks operation when
@@ -21,17 +24,25 @@ class PutDoneContext : public OperationDoneContext
{
uint32_t _lid;
DocIdLimit *_docIdLimit;
+ IGidToLidChangeHandler &_gidToLidChangeHandler;
+ document::GlobalId _gid;
+ search::SerialNum _serialNum;
+ bool _enableNotifyPut;
public:
PutDoneContext(std::unique_ptr<FeedToken> token,
const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics);
+ PerDocTypeFeedMetrics &metrics,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid,
+ uint32_t lid,
+ search::SerialNum serialNum,
+ bool enableNotifyPut);
virtual ~PutDoneContext();
- void registerPutLid(uint32_t lid, DocIdLimit *docIdLimit)
+ void registerPutLid(DocIdLimit *docIdLimit)
{
- _lid = lid;
_docIdLimit = docIdLimit;
}
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp
new file mode 100644
index 00000000000..b0ece5f35a1
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.cpp
@@ -0,0 +1,27 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "remove_batch_done_context.h"
+#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
+
+namespace proton {
+
+RemoveBatchDoneContext::RemoveBatchDoneContext(vespalib::Executor &executor,
+ vespalib::Executor::Task::UP task,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ std::vector<document::GlobalId> gidsToRemove,
+ search::SerialNum serialNum)
+ : search::ScheduleTaskCallback(executor, std::move(task)),
+ _gidToLidChangeHandler(gidToLidChangeHandler),
+ _gidsToRemove(std::move(gidsToRemove)),
+ _serialNum(serialNum)
+{
+}
+
+RemoveBatchDoneContext::~RemoveBatchDoneContext()
+{
+ for (const auto &gid : _gidsToRemove) {
+ _gidToLidChangeHandler.notifyRemoveDone(gid, _serialNum);
+ }
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h
new file mode 100644
index 00000000000..2a93239574a
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/remove_batch_done_context.h
@@ -0,0 +1,38 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchlib/common/scheduletaskcallback.h>
+#include <vespa/document/base/globalid.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vector>
+
+namespace proton
+{
+
+class IGidToLidChangeHandler;
+
+/**
+ * Context class for document batch remove that notifies gid to lid
+ * change handler about each remove done and schedules a
+ * task when instance is destroyed. Typically a shared pointer to an
+ * instance is passed around to multiple worker threads that performs
+ * portions of a larger task before dropping the shared pointer.
+ */
+class RemoveBatchDoneContext : public search::ScheduleTaskCallback
+{
+ IGidToLidChangeHandler &_gidToLidChangeHandler;
+ std::vector<document::GlobalId> _gidsToRemove;
+ search::SerialNum _serialNum;
+
+public:
+ RemoveBatchDoneContext(vespalib::Executor &executor,
+ vespalib::Executor::Task::UP task,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ std::vector<document::GlobalId> gidsToRemove,
+ search::SerialNum serialNum);
+
+ virtual ~RemoveBatchDoneContext();
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
index 219d7482853..522b0aed617 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
@@ -3,6 +3,7 @@
#include "removedonecontext.h"
#include "removedonetask.h"
#include <vespa/searchcore/proton/common/feedtoken.h>
+#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
namespace proton {
@@ -11,10 +12,18 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token,
PerDocTypeFeedMetrics &metrics,
vespalib::Executor &executor,
IDocumentMetaStore &documentMetaStore,
- uint32_t lid)
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid,
+ uint32_t lid,
+ search::SerialNum serialNum,
+ bool enableNotifyRemoveDone)
: OperationDoneContext(std::move(token), opType, metrics),
_executor(executor),
- _task()
+ _task(),
+ _gidToLidChangeHandler(gidToLidChangeHandler),
+ _gid(gid),
+ _serialNum(serialNum),
+ _enableNotifyRemoveDone(enableNotifyRemoveDone)
{
if (lid != 0) {
_task = std::make_unique<RemoveDoneTask>(documentMetaStore, lid);
@@ -23,6 +32,9 @@ RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token,
RemoveDoneContext::~RemoveDoneContext()
{
+ if (_enableNotifyRemoveDone) {
+ _gidToLidChangeHandler.notifyRemoveDone(_gid, _serialNum);
+ }
ack();
if (_task) {
vespalib::Executor::Task::UP res = _executor.execute(std::move(_task));
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
index b59219fc8c9..9311a6d2b6e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
@@ -4,11 +4,14 @@
#include "operationdonecontext.h"
#include <vespa/vespalib/util/executor.h>
+#include <vespa/document/base/globalid.h>
+#include <vespa/searchlib/common/serialnum.h>
namespace proton
{
class IDocumentMetaStore;
+class IGidToLidChangeHandler;
/**
@@ -23,6 +26,10 @@ class RemoveDoneContext : public OperationDoneContext
{
vespalib::Executor &_executor;
std::unique_ptr<vespalib::Executor::Task> _task;
+ IGidToLidChangeHandler &_gidToLidChangeHandler;
+ document::GlobalId _gid;
+ search::SerialNum _serialNum;
+ bool _enableNotifyRemoveDone;
public:
RemoveDoneContext(std::unique_ptr<FeedToken> token,
@@ -30,7 +37,11 @@ public:
PerDocTypeFeedMetrics &metrics,
vespalib::Executor &executor,
IDocumentMetaStore &documentMetaStore,
- uint32_t lid);
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid,
+ uint32_t lid,
+ search::SerialNum serialNum,
+ bool enableNotifyRemoveDone);
virtual ~RemoveDoneContext();
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 62e01db28e0..5f0fc3b8aa6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -7,6 +7,7 @@
#include "removedonecontext.h"
#include "storeonlyfeedview.h"
#include "updatedonecontext.h"
+#include "remove_batch_done_context.h"
#include <vespa/document/datatype/documenttype.h>
#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
@@ -27,6 +28,7 @@ using document::DocumentUpdate;
using search::index::Schema;
using search::makeLambdaTask;
using search::IDestructorCallback;
+using search::SerialNum;
using storage::spi::BucketInfoResult;
using storage::spi::Timestamp;
using vespalib::IllegalStateException;
@@ -57,32 +59,37 @@ private:
public:
PutDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics, IDestructorCallback::SP moveDoneCtx)
- : PutDoneContext(std::move(token), opType, metrics),
+ PerDocTypeFeedMetrics &metrics,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid,
+ uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx)
+ : PutDoneContext(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut),
_moveDoneCtx(std::move(moveDoneCtx))
{}
virtual ~PutDoneContextForMove() {}
};
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, bool force,
+createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid, uint32_t lid,
+ SerialNum serialNum, bool enableNotifyPut,
IDestructorCallback::SP moveDoneCtx)
{
std::shared_ptr<PutDoneContext> result;
- if (token || force) {
- if (moveDoneCtx) {
- result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, std::move(moveDoneCtx));
- } else {
- result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics);
- }
+ if (moveDoneCtx) {
+ result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, std::move(moveDoneCtx));
+ } else {
+ result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut);
}
return result;
}
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, bool force)
+createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut)
{
- return createPutDoneContext(token, opType, metrics, force, IDestructorCallback::SP());
+ return createPutDoneContext(token, opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP());
}
std::shared_ptr<UpdateDoneContext>
@@ -109,9 +116,14 @@ private:
public:
RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType,
PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore, uint32_t lid,
+ IDocumentMetaStore &documentMetaStore,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid,
+ uint32_t lid,
+ SerialNum serialNum,
+ bool enableNotifyRemoveDone,
IDestructorCallback::SP moveDoneCtx)
- : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, lid),
+ : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone),
_moveDoneCtx(std::move(moveDoneCtx))
{}
virtual ~RemoveDoneContextForMove() {}
@@ -120,15 +132,20 @@ public:
std::shared_ptr<RemoveDoneContext>
createRemoveDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType,
PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore, uint32_t lid,
+ IDocumentMetaStore &documentMetaStore,
+ IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid,
+ uint32_t lid,
+ SerialNum serialNum,
+ bool enableNotifyRemoveDone,
IDestructorCallback::SP moveDoneCtx)
{
if (moveDoneCtx) {
return std::make_shared<RemoveDoneContextForMove>
- (std::move(token), opType, metrics, executor, documentMetaStore, lid, std::move(moveDoneCtx));
+ (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone, std::move(moveDoneCtx));
} else {
return std::make_shared<RemoveDoneContext>
- (std::move(token), opType, metrics, executor, documentMetaStore, lid);
+ (std::move(token), opType, metrics, executor, documentMetaStore, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyRemoveDone);
}
}
@@ -281,7 +298,6 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
putOp.getSubDbId(), putOp.getLid(), putOp.getPrevSubDbId(), putOp.getPrevLid(),
_params._subDbId, doc->toString(true).size(), doc->toString(true).c_str());
- uint32_t oldDocIdLimit = _metaStore.getCommittedDocIdLimit();
adjustMetaStore(putOp, docId);
considerEarlyAck(token, putOp.getType());
@@ -289,16 +305,18 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
if (putOp.getValidDbdId(_params._subDbId)) {
bool immediateCommit = _commitTimeTracker.needCommit();
+ const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(token, putOp.getType(), _params._metrics,
- immediateCommit && putOp.getLid() >= oldDocIdLimit);
+ _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, putOp.changedDbdId() && useDocumentMetaStore(serialNum));
putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone);
}
if (docAlreadyExists && putOp.changedDbdId()) {
assert(!putOp.getValidDbdId(_params._subDbId));
- internalRemove(std::move(token), serialNum, putOp.getPrevLid(), putOp.getType(), IDestructorCallback::SP());
+ const document::GlobalId &gid = docId.getGlobalId();
+ internalRemove(std::move(token), serialNum, gid, putOp.getPrevLid(), putOp.getType(), useDocumentMetaStore(serialNum), IDestructorCallback::SP());
}
if (token.get() != NULL) {
token->ack(putOp.getType(), _params._metrics);
@@ -575,7 +593,8 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
if (rmOp.getValidPrevDbdId(_params._subDbId)) {
if (rmOp.changedDbdId()) {
assert(!rmOp.getValidDbdId(_params._subDbId));
- internalRemove(std::move(token), serialNum, rmOp.getPrevLid(), rmOp.getType(), IDestructorCallback::SP());
+ const document::GlobalId &gid = docId.getGlobalId();
+ internalRemove(std::move(token), serialNum, gid, rmOp.getPrevLid(), rmOp.getType(), useDocumentMetaStore(serialNum), IDestructorCallback::SP());
}
}
if (token.get() != NULL) {
@@ -584,19 +603,14 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
}
void
-StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, Lid lid,
- FeedOperation::Type opType, IDestructorCallback::SP moveDoneCtx)
+StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid,
+ FeedOperation::Type opType, bool enableNotifyRemoveDone, IDestructorCallback::SP moveDoneCtx)
{
removeSummary(serialNum, lid);
bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid);
std::shared_ptr<RemoveDoneContext> onWriteDone;
- if (explicitReuseLid || token) {
- onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(),
- _metaStore, (explicitReuseLid ? lid : 0u), moveDoneCtx);
- } else if (moveDoneCtx) {
- onWriteDone = createRemoveDoneContext(FeedToken::UP(), opType, _params._metrics, _writeService.master(),
- _metaStore, 0u, moveDoneCtx);
- }
+ onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(),
+ _metaStore, _gidToLidChangeHandler, gid, (explicitReuseLid ? lid : 0u), serialNum, enableNotifyRemoveDone, moveDoneCtx);
bool immediateCommit = _commitTimeTracker.needCommit();
removeAttributes(serialNum, lid, immediateCommit, onWriteDone);
removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone);
@@ -613,17 +627,12 @@ StoreOnlyFeedView::adjustMetaStore(const DocumentOperation &op, const DocumentId
op.getLid() != op.getPrevLid())
{
moveMetaData(_metaStore, docId, op);
- _gidToLidChangeHandler.notifyPut(docId.getGlobalId(), op.getLid(), serialNum);
} else {
putMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED);
- if (op.getDbDocumentId() != op.getPrevDbDocumentId()) {
- _gidToLidChangeHandler.notifyPut(docId.getGlobalId(), op.getLid(), serialNum);
- }
}
} else if (op.getValidPrevDbdId(_params._subDbId)) {
- removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED);
_gidToLidChangeHandler.notifyRemove(docId.getGlobalId(), serialNum);
- _gidToLidChangeHandler.notifyRemoveDone(docId.getGlobalId(), serialNum);
+ removeMetaData(_metaStore, docId, op, _params._subDbType == SubDbType::REMOVED);
}
_metaStore.commit(serialNum, serialNum);
}
@@ -650,23 +659,26 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo
const LidVector &lidsToRemove(ctx->getLidVector());
bool useDMS = useDocumentMetaStore(serialNum);
bool explicitReuseLids = false;
+ std::vector<document::GlobalId> gidsToRemove;
if (useDMS) {
- std::vector<document::GlobalId> gidsToRemove(getGidsToRemove(_metaStore, lidsToRemove));
- _metaStore.removeBatch(lidsToRemove, ctx->getDocIdLimit());
+ gidsToRemove = getGidsToRemove(_metaStore, lidsToRemove);
for (const auto &gid : gidsToRemove) {
_gidToLidChangeHandler.notifyRemove(gid, serialNum);
- _gidToLidChangeHandler.notifyRemoveDone(gid, serialNum);
}
+ _metaStore.removeBatch(lidsToRemove, ctx->getDocIdLimit());
_metaStore.commit(serialNum, serialNum);
explicitReuseLids = _lidReuseDelayer.delayReuse(lidsToRemove);
}
std::shared_ptr<search::IDestructorCallback> onWriteDone;
+ vespalib::Executor::Task::UP removeBatchDoneTask;
+ if (explicitReuseLids) {
+ removeBatchDoneTask = makeLambdaTask([=]() { _metaStore.removeBatchComplete(lidsToRemove); });
+ } else {
+ removeBatchDoneTask = makeLambdaTask([]() {});
+ }
+ onWriteDone = std::make_shared<RemoveBatchDoneContext>(_writeService.master(), std::move(removeBatchDoneTask),
+ _gidToLidChangeHandler, std::move(gidsToRemove), serialNum);
if (remove_index_and_attributes) {
- if (explicitReuseLids) {
- onWriteDone = std::make_shared<search::ScheduleTaskCallback>(
- _writeService.master(),
- makeLambdaTask([=]() { _metaStore.removeBatchComplete(lidsToRemove); }));
- }
removeIndexedFields(serialNum, lidsToRemove, immediateCommit, onWriteDone);
removeAttributes(serialNum, lidsToRemove, immediateCommit, onWriteDone);
}
@@ -675,9 +687,6 @@ StoreOnlyFeedView::removeDocuments(const RemoveDocumentsOperation &op, bool remo
removeSummary(serialNum, lid);
}
}
- if (explicitReuseLids && !onWriteDone) {
- _metaStore.removeBatchComplete(lidsToRemove);
- }
return lidsToRemove.size();
}
@@ -741,26 +750,29 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
moveOp.getSubDbId(), moveOp.getLid(), moveOp.getPrevSubDbId(), moveOp.getPrevLid(),
_params._subDbId, doc->toString(true).size(), doc->toString(true).c_str());
- uint32_t oldDocIdLimit = _metaStore.getCommittedDocIdLimit();
adjustMetaStore(moveOp, docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
bool immediateCommit = _commitTimeTracker.needCommit();
+ const document::GlobalId &gid = docId.getGlobalId();
FeedToken::UP token;
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(token, moveOp.getType(), _params._metrics,
- immediateCommit && (moveOp.getLid() >= oldDocIdLimit), doneCtx);
+ _gidToLidChangeHandler, gid, moveOp.getLid(),
+ serialNum, moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx);
putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone);
}
if (docAlreadyExists && moveOp.changedDbdId()) {
- internalRemove(FeedToken::UP(), serialNum, moveOp.getPrevLid(), moveOp.getType(), doneCtx);
+ const document::GlobalId &gid = docId.getGlobalId();
+ bool enableNotifyRemoveDone = useDocumentMetaStore(serialNum) && !moveOp.getValidDbdId(_params._subDbId);
+ internalRemove(FeedToken::UP(), serialNum, gid, moveOp.getPrevLid(), moveOp.getType(), enableNotifyRemoveDone, doneCtx);
}
}
void
-StoreOnlyFeedView::heartBeat(search::SerialNum serialNum)
+StoreOnlyFeedView::heartBeat(SerialNum serialNum)
{
assert(_writeService.master().isCurrentThread());
_metaStore.removeAllOldGenerations();
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 1302ef7b632..01a8122ed1e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -22,6 +22,8 @@
namespace search { class IDestructorCallback; }
+namespace document { class GLobalId; }
+
namespace proton {
class IReplayConfig;
@@ -178,8 +180,8 @@ private:
size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields,
bool immediateCommit);
- void internalRemove(FeedTokenUP token, SerialNum serialNum, Lid lid,
- FeedOperation::Type opType, std::shared_ptr<search::IDestructorCallback> moveDoneCtx);
+ void internalRemove(FeedTokenUP token, SerialNum serialNum, const document::GlobalId &gid, Lid lid,
+ FeedOperation::Type opType, bool enableNotifyRemoveDone, std::shared_ptr<search::IDestructorCallback> moveDoneCtx);
// Ack token early if visibility delay is nonzero
void considerEarlyAck(FeedTokenUP &token, FeedOperation::Type opType);
diff --git a/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h b/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h
index 4e12fb9d1b8..27bbe751532 100644
--- a/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h
+++ b/searchlib/src/vespa/searchlib/common/scheduletaskcallback.h
@@ -2,6 +2,7 @@
#pragma once
#include "idestructorcallback.h"
+#include <vespa/vespalib/util/executor.h>
namespace search
{