summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-26 21:30:47 +0200
committerGitHub <noreply@github.com>2020-09-26 21:30:47 +0200
commitf6683a698da0a7478a00dcc57c9abf2dccc35f4a (patch)
tree60d8dae4397bdff0068e255b3e26efb8f4662ba8 /searchcore
parent511057bb713a56f99a9b73e2ab3de998f72aee67 (diff)
Revert "Revert "Revert "Revert "Revert "Balder/group operations to tls and commit in batches"""""
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp26
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h2
13 files changed, 21 insertions, 68 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
index 1f979d1566c..b04bac5ef26 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp
@@ -7,7 +7,6 @@ namespace proton::documentmetastore {
LidReuseDelayerConfig::LidReuseDelayerConfig(const DocumentDBConfig & configSnapshot)
: _visibilityDelay(configSnapshot.getMaintenanceConfigSP()->getVisibilityDelay()),
- _allowEarlyAck(configSnapshot.getMaintenanceConfigSP()->allowEarlyAck()),
_hasIndexedOrAttributeFields(configSnapshot.getSchemaSP()->getNumIndexFields() > 0 ||
configSnapshot.getSchemaSP()->getNumAttributeFields() > 0)
{
@@ -19,7 +18,6 @@ LidReuseDelayerConfig::LidReuseDelayerConfig()
LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in)
: _visibilityDelay(visibilityDelay),
- _allowEarlyAck(visibilityDelay > 1ms),
_hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
index c81a2ff399f..82dab433a22 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.h
@@ -15,7 +15,6 @@ class LidReuseDelayerConfig
{
private:
vespalib::duration _visibilityDelay;
- bool _allowEarlyAck;
bool _hasIndexedOrAttributeFields;
public:
LidReuseDelayerConfig();
@@ -23,7 +22,6 @@ public:
explicit LidReuseDelayerConfig(const DocumentDBConfig &configSnapshot);
vespalib::duration visibilityDelay() const { return _visibilityDelay; }
bool hasIndexedOrAttributeFields() const { return _hasIndexedOrAttributeFields; }
- bool allowEarlyAck() const { return _allowEarlyAck; }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
index ed2202c830b..03dfd83a132 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.cpp
@@ -16,7 +16,6 @@ LidReuseDelayer::LidReuseDelayer(IThreadingService &writeService, IStore &docume
: _writeService(writeService),
_documentMetaStore(documentMetaStore),
_immediateCommit(config.visibilityDelay() == vespalib::duration::zero()),
- _allowEarlyAck(config.allowEarlyAck()),
_config(config),
_pendingLids()
{
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
index ba407ab57f8..5f1de878b4a 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lidreusedelayer.h
@@ -27,7 +27,6 @@ class LidReuseDelayer
searchcorespi::index::IThreadingService &_writeService;
IStore &_documentMetaStore;
const bool _immediateCommit;
- const bool _allowEarlyAck;
LidReuseDelayerConfig _config;
std::vector<uint32_t> _pendingLids; // lids waiting for commit
@@ -39,8 +38,7 @@ public:
bool delayReuse(const std::vector<uint32_t> &lids);
std::vector<uint32_t> getReuseLids();
- bool needImmediateCommit() const { return _immediateCommit; }
- bool allowEarlyAck() const { return _allowEarlyAck; }
+ bool getImmediateCommit() const { return _immediateCommit; }
const LidReuseDelayerConfig & getConfig() const { return _config; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
index 5ed0ad7492c..4c0485baec6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
@@ -135,7 +135,6 @@ public:
}
vespalib::duration getVisibilityDelay() const { return _visibilityDelay; }
bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); }
- bool allowEarlyAck() const { return _visibilityDelay > 1ms; }
const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const {
return _lidSpaceCompaction;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 209a35ce4a2..37dfddf0c2c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -402,8 +402,6 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_tlsReplayProgress(),
_serialNum(0),
_prunedSerialNum(0),
- _numPendingCommit(0),
- _numCommitsCompleted(0),
_delayedPrune(false),
_feedLock(),
_feedState(make_shared<InitState>(getDocTypeName())),
@@ -497,39 +495,11 @@ FeedHandler::getTransactionLogReplayDone() const {
}
void
-FeedHandler::onCommitDone(uint64_t numPendingAtStart) {
- assert(numPendingAtStart <= _numPendingCommit);
- _numPendingCommit -= numPendingAtStart;
- if (_numPendingCommit > 0) {
- enqueCommitTask();
- }
-}
-
-void FeedHandler::enqueCommitTask() {
- _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); }));
-}
-
-void
-FeedHandler::initiateCommit() {
- auto commitResult = _tlsWriter->startCommit(std::make_shared<OnCommitDone>(
- _writeService.master(),
- makeLambdaTask([this, numPendingAtStart=_numPendingCommit]() {
- onCommitDone(numPendingAtStart);
- })));
- if (_activeFeedView) {
- _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlive<CommitResult>>(std::move(commitResult)));
- }
-}
-
-void
FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) {
if (!op.getSerialNum()) {
const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
_tlsWriter->appendOperation(op, std::move(onDone));
- if (++_numPendingCommit == 1) {
- enqueCommitTask();
- }
}
FeedHandler::CommitResult
@@ -540,7 +510,7 @@ FeedHandler::startCommit(DoneCallback onDone) {
void
FeedHandler::storeOperationSync(const FeedOperation &op) {
vespalib::Gate gate;
- appendAndCommitOperation(op, make_shared<search::GateCallback>(gate));
+ appendOperation(op, make_shared<search::GateCallback>(gate));
gate.await();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 97629bfc018..4807c596130 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -76,8 +76,6 @@ private:
// the serial num of the last message in the transaction log
SerialNum _serialNum;
SerialNum _prunedSerialNum;
- size_t _numPendingCommit;
- size_t _numCommitsCompleted;
bool _delayedPrune;
mutable std::shared_mutex _feedLock;
FeedStateSP _feedState;
@@ -127,9 +125,6 @@ private:
FeedStateSP getFeedState() const;
void changeFeedState(FeedStateSP newState);
void doChangeFeedState(FeedStateSP newState);
- void onCommitDone(uint64_t numPendingAtStart);
- void initiateCommit();
- void enqueCommitTask();
public:
FeedHandler(const FeedHandler &) = delete;
FeedHandler & operator = (const FeedHandler &) = delete;
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
index c3b76a9db75..b276779c2ee 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
@@ -22,10 +22,6 @@ struct IOperationStorer
*/
virtual void appendOperation(const FeedOperation &op, DoneCallback onDone) = 0;
[[nodiscard]] virtual CommitResult startCommit(DoneCallback onDone) = 0;
- void appendAndCommitOperation(const FeedOperation &op, DoneCallback onDone) {
- appendOperation(op, onDone);
- (void) startCommit(std::move(onDone));
- }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index 468850b4409..d423e095ad9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -82,7 +82,7 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats)
uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit);
vespalib::Gate gate;
- _opStorer.appendAndCommitOperation(op, std::make_shared<search::GateCallback>(gate));
+ _opStorer.appendOperation(op, std::make_shared<search::GateCallback>(gate));
gate.await();
_handler.handleCompactLidSpace(op);
EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit);
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
index 35549f21471..37497eaa998 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
@@ -63,13 +63,13 @@ public:
~LidSpaceCompactionJob();
// Implements IDiskMemUsageListener
- void notifyDiskMemUsage(DiskMemUsageState state) override;
+ virtual void notifyDiskMemUsage(DiskMemUsageState state) override;
// Implements IClusterStateChangedNofifier
- void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
+ virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
// Implements IMaintenanceJob
- bool run() override;
+ virtual bool run() override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index d4b542a0af8..d94bb2e3d03 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -105,7 +105,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
AttributeUsageFilter &attributeUsageFilter) {
controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig()));
controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));
- if (config.hasVisibilityDelay() && config.allowEarlyAck()) {
+ if (config.hasVisibilityDelay()) {
controller.registerJobInMasterThread(std::make_unique<DocumentDBCommitJob>(commit, config.getVisibilityDelay()));
}
const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB());
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 9927a2d2ce0..217a3bb24d3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -210,11 +210,11 @@ moveMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_id, c
}
std::unique_ptr<PendingLidTrackerBase>
-createUncommitedLidTracker(bool allowEarlyAck) {
- if (allowEarlyAck) {
- return std::make_unique<TwoPhasePendingLidTracker>();
- } else {
+createUncommitedLidTracker(bool needImmediateCommit) {
+ if (needImmediateCommit) {
return std::make_unique<PendingLidTracker>();
+ } else {
+ return std::make_unique<TwoPhasePendingLidTracker>();
}
}
@@ -229,7 +229,7 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams
_docType(nullptr),
_lidReuseDelayer(ctx._writeService, _documentMetaStoreContext->get(), ctx._lidReuseDelayerConfig),
_pendingLidsForDocStore(),
- _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.allowEarlyAck())),
+ _pendingLidsForCommit(createUncommitedLidTracker(_lidReuseDelayer.getImmediateCommit())),
_schema(ctx._schema),
_writeService(ctx._writeService),
_params(params),
@@ -275,7 +275,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp
void
StoreOnlyFeedView::considerEarlyAck(FeedToken & token)
{
- if ( _lidReuseDelayer.allowEarlyAck() && token) {
+ if ( ! needCommit() && token) {
token.reset();
}
}
@@ -327,7 +327,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
if (putOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = needImmediateCommit();
+ bool immediateCommit = needCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(std::move(token), std::move(uncommitted),
@@ -345,8 +345,8 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
}
bool
-StoreOnlyFeedView::needImmediateCommit() const {
- return _lidReuseDelayer.needImmediateCommit();
+StoreOnlyFeedView::needCommit() const {
+ return _lidReuseDelayer.getImmediateCommit();
}
void
@@ -483,7 +483,7 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
auto uncommitted = _pendingLidsForCommit->produce(updOp.getLid());
considerEarlyAck(token);
- bool immediateCommit = needImmediateCommit();
+ bool immediateCommit = needCommit();
auto onWriteDone = createUpdateDoneContext(std::move(token), std::move(uncommitted), updOp.getUpdate());
UpdateScope updateScope(*_schema, upd);
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone, updateScope);
@@ -657,7 +657,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token unc
std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u),
std::move(moveDoneCtx));
removeSummary(serialNum, lid, onWriteDone);
- bool immediateCommit = needImmediateCommit();
+ bool immediateCommit = needCommit();
removeAttributes(serialNum, lid, immediateCommit, onWriteDone);
removeIndexedFields(serialNum, lid, immediateCommit, onWriteDone);
}
@@ -770,7 +770,7 @@ StoreOnlyFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp)
void
StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp)
{
- bool immediateCommit = needImmediateCommit();
+ bool immediateCommit = needCommit();
size_t rm_count = removeDocuments(delOp, true, immediateCommit);
LOG(debug, "internalDeleteBucket(): docType(%s), bucket(%s), lidsToRemove(%zu)",
_params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count);
@@ -809,7 +809,7 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(moveOp, docId.getGlobalId(), docId);
bool docAlreadyExists = moveOp.getValidPrevDbdId(_params._subDbId);
if (moveOp.getValidDbdId(_params._subDbId)) {
- bool immediateCommit = needImmediateCommit();
+ bool immediateCommit = needCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()),
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 4569e01f9fd..7d91ea86a22 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -161,7 +161,7 @@ private:
void putSummary(SerialNum serialNum, Lid lid, DocumentSP doc, OnOperationDoneType onDone);
void removeSummary(SerialNum serialNum, Lid lid, OnWriteDoneType onDone);
void heartBeatSummary(SerialNum serialNum);
- bool needImmediateCommit() const;
+ bool needCommit() const;
bool useDocumentStore(SerialNum replaySerialNum) const {