aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-19 22:09:40 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-01-22 11:57:59 +0000
commit9c70574815a819ba3838d6ac86abd548077002e6 (patch)
tree490a7a78e929192e6cbd2cdccbd4109409bbc581 /searchcore
parente41754597890c4611980fee95e8aec8f9b29e476 (diff)
Make a lidspace job that uses a bucket executor to hold the bucket lock.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp165
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h44
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp180
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h72
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp70
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h45
11 files changed, 399 insertions, 195 deletions
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
index c7239eaba16..b4c878b26eb 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
@@ -72,6 +72,9 @@ struct MyScanIterator : public IDocumentScanIterator {
}
return search::DocumentMetaData();
}
+ search::DocumentMetaData getMetaData(uint32_t lid) const override {
+ return search::DocumentMetaData(lid, TIMESTAMP_1, BUCKET_ID_1, GID_1);
+ }
};
struct MyHandler : public ILidSpaceCompactionHandler {
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 73b7404ce31..26fba8a780f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -58,6 +58,8 @@ vespa_add_library(searchcore_server STATIC
job_tracked_maintenance_job.cpp
lid_space_compaction_handler.cpp
lid_space_compaction_job.cpp
+ lid_space_compaction_job_base.cpp
+ lid_space_compaction_job_take2.cpp
maintenance_controller_explorer.cpp
maintenance_jobs_injector.cpp
maintenancecontroller.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp
index 0332303abae..35c03ef73d3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp
@@ -39,4 +39,14 @@ DocumentScanIterator::next(uint32_t compactLidLimit, bool retry)
return DocumentMetaData();
}
+search::DocumentMetaData
+DocumentScanIterator::getMetaData(uint32_t lid) const {
+ if (_metaStore.validLid(lid)) {
+ const RawDocumentMetaData &metaData = _metaStore.getRawMetaData(_lastLid);
+ return DocumentMetaData(_lastLid, metaData.getTimestamp(),
+ metaData.getBucketId(), metaData.getGid());
+ }
+ return DocumentMetaData();
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h
index 62dff0a7d85..a465ca46fa4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h
+++ b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h
@@ -22,6 +22,7 @@ public:
DocumentScanIterator(const IDocumentMetaStore &_metaStore);
bool valid() const override;
search::DocumentMetaData next(uint32_t compactLidLimit, bool retry) override;
+ search::DocumentMetaData getMetaData(uint32_t lid) const override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h b/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h
index bcc52ffb475..2e66c2bf469 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h
@@ -30,6 +30,8 @@ struct IDocumentScanIterator
* @param retry Whether we should start the scan with the previous returned document.
*/
virtual search::DocumentMetaData next(uint32_t compactLidLimit, bool retry) = 0;
+
+ virtual search::DocumentMetaData getMetaData(uint32_t lid) const = 0;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index 228c1e4355e..c752b7950c3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -1,20 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "i_disk_mem_usage_notifier.h"
-#include "iclusterstatechangednotifier.h"
-#include "imaintenancejobrunner.h"
#include "lid_space_compaction_job.h"
#include "i_document_scan_iterator.h"
#include "ifrozenbuckethandler.h"
+#include "i_lid_space_compaction_handler.h"
+#include "i_operation_storer.h"
#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
-#include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h>
-#include <vespa/searchcore/proton/common/eventlogger.h>
-#include <vespa/vespalib/util/destructor_callbacks.h>
-#include <vespa/vespalib/util/gate.h>
-#include <cassert>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".proton.server.lid_space_compaction_job");
using search::DocumentMetaData;
using search::LidUsageStats;
@@ -22,33 +13,11 @@ using search::LidUsageStats;
namespace proton {
bool
-LidSpaceCompactionJob::hasTooMuchLidBloat(const LidUsageStats &stats) const
-{
- return (stats.getLidBloat() >= _cfg.getAllowedLidBloat() &&
- stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor() &&
- stats.getLidLimit() > stats.getLowestFreeLid());
-}
-
-bool
-LidSpaceCompactionJob::shouldRestartScanDocuments(const LidUsageStats &stats) const
-{
- return (stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid() &&
- stats.getLowestFreeLid() < stats.getHighestUsedLid();
-}
-
-DocumentMetaData
-LidSpaceCompactionJob::getNextDocument(const LidUsageStats &stats)
-{
- DocumentMetaData document = _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), _retryFrozenDocument);
- _retryFrozenDocument = false;
- return document;
-}
-
-bool
LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats)
{
if (_scanItr->valid()) {
- DocumentMetaData document = getNextDocument(stats);
+ DocumentMetaData document = getNextDocument(stats, _retryFrozenDocument);
+ _retryFrozenDocument = false;
if (document.valid()) {
IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(document.bucketId);
if ( ! bucketGuard ) {
@@ -57,7 +26,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats)
_retryFrozenDocument = true;
return true;
} else {
- MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid());
+ std::unique_ptr<MoveOperation> op = _handler.createMoveOperation(document, stats.getLowestFreeLid());
if ( ! op ) {
return false;
}
@@ -70,40 +39,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats)
}
}
}
- if (!_scanItr->valid()){
- if (shouldRestartScanDocuments(_handler.getLidStatus())) {
- _scanItr = _handler.getIterator();
- } else {
- _scanItr = IDocumentScanIterator::UP();
- _shouldCompactLidSpace = true;
- }
- }
- return false; // more work to do (scan documents or compact lid space)
-}
-
-void
-LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats)
-{
- uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
- CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit);
- vespalib::Gate gate;
- auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared<vespalib::GateCallback>(gate));
- gate.await();
- _handler.handleCompactLidSpace(op, std::make_shared<vespalib::KeepAlive<decltype(commit_result)>>(std::move(commit_result)));
- EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit);
- _shouldCompactLidSpace = false;
-}
-
-bool
-LidSpaceCompactionJob::remove_batch_is_ongoing() const
-{
- return _ops_rate_tracker->remove_batch_above_threshold();
-}
-
-bool
-LidSpaceCompactionJob::remove_is_ongoing() const
-{
- return _ops_rate_tracker->remove_above_threshold();
+ return scanDocumentsPost();
}
LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
@@ -114,94 +50,13 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC
const BlockableMaintenanceJobConfig &blockableConfig,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
bool nodeRetired)
- : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(),
- config.getDelay(), config.getInterval(), blockableConfig),
- _cfg(config),
- _handler(handler),
- _opStorer(opStorer),
+ : LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier,
+ blockableConfig, clusterStateChangedNotifier, nodeRetired),
_frozenHandler(frozenHandler),
- _scanItr(),
- _retryFrozenDocument(false),
- _shouldCompactLidSpace(false),
- _diskMemUsageNotifier(diskMemUsageNotifier),
- _clusterStateChangedNotifier(clusterStateChangedNotifier),
- _ops_rate_tracker(std::make_shared<RemoveOperationsRateTracker>(config.get_remove_batch_block_rate(),
- config.get_remove_block_rate())),
- _is_disabled(false)
-{
- _diskMemUsageNotifier.addDiskMemUsageListener(this);
- _clusterStateChangedNotifier.addClusterStateChangedHandler(this);
- if (nodeRetired) {
- setBlocked(BlockedReason::CLUSTER_STATE);
- }
- handler.set_operation_listener(_ops_rate_tracker);
-}
-
-LidSpaceCompactionJob::~LidSpaceCompactionJob()
-{
- _clusterStateChangedNotifier.removeClusterStateChangedHandler(this);
- _diskMemUsageNotifier.removeDiskMemUsageListener(this);
-}
-
-bool
-LidSpaceCompactionJob::run()
+ _retryFrozenDocument(false)
{
- if (isBlocked()) {
- return true; // indicate work is done since no work can be done
- }
- LidUsageStats stats = _handler.getLidStatus();
- if (remove_batch_is_ongoing()) {
- // Note that we don't set the job as blocked as the decision to un-block it is not driven externally.
- LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing",
- _handler.getName().c_str());
- _is_disabled = true;
- return true;
- }
- if (remove_is_ongoing()) {
- // Note that we don't set the job as blocked as the decision to un-block it is not driven externally.
- LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing",
- _handler.getName().c_str());
- _is_disabled = true;
- return true;
- }
- if (_is_disabled) {
- LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing",
- _handler.getName().c_str());
- _is_disabled = false;
- }
- if (_scanItr) {
- return scanDocuments(stats);
- } else if (_shouldCompactLidSpace) {
- compactLidSpace(stats);
- } else if (hasTooMuchLidBloat(stats)) {
- assert(!_scanItr);
- _scanItr = _handler.getIterator();
- return scanDocuments(stats);
- }
- return true;
}
-void
-LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state)
-{
- // Called by master write thread
- internalNotifyDiskMemUsage(state);
-}
+LidSpaceCompactionJob::~LidSpaceCompactionJob() = default;
-void
-LidSpaceCompactionJob::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc)
-{
- // Called by master write thread
- bool nodeRetired = newCalc->nodeRetired();
- if (!nodeRetired) {
- if (isBlocked(BlockedReason::CLUSTER_STATE)) {
- LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler.getName().c_str());
- unBlock(BlockedReason::CLUSTER_STATE);
- }
- } else {
- LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler.getName().c_str());
- setBlocked(BlockedReason::CLUSTER_STATE);
- }
}
-
-} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
index e9d5de58323..93c4ccfbc20 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
@@ -1,21 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "blockable_maintenance_job.h"
-#include "document_db_maintenance_config.h"
-#include "i_disk_mem_usage_listener.h"
-#include "i_lid_space_compaction_handler.h"
-#include "i_operation_storer.h"
-#include "ibucketstatecalculator.h"
-#include "iclusterstatechangedhandler.h"
-#include "iclusterstatechangednotifier.h"
-#include "remove_operations_rate_tracker.h"
+#include "lid_space_compaction_job_base.h"
namespace proton {
class IFrozenBucketHandler;
-class IDiskMemUsageNotifier;
-class IClusterStateChangedNotifier;
/**
* Job that regularly checks whether lid space compaction should be performed
@@ -24,30 +14,13 @@ class IClusterStateChangedNotifier;
* Compaction is handled by moving documents from high lids to low free lids.
* A handler is typically working over a single document sub db.
*/
-class LidSpaceCompactionJob : public BlockableMaintenanceJob,
- public IDiskMemUsageListener,
- public IClusterStateChangedHandler
+class LidSpaceCompactionJob : public LidSpaceCompactionJobBase
{
private:
- const DocumentDBLidSpaceCompactionConfig _cfg;
- ILidSpaceCompactionHandler &_handler;
- IOperationStorer &_opStorer;
IFrozenBucketHandler &_frozenHandler;
- std::unique_ptr<IDocumentScanIterator> _scanItr;
bool _retryFrozenDocument;
- bool _shouldCompactLidSpace;
- IDiskMemUsageNotifier &_diskMemUsageNotifier;
- IClusterStateChangedNotifier &_clusterStateChangedNotifier;
- std::shared_ptr<RemoveOperationsRateTracker> _ops_rate_tracker;
- bool _is_disabled;
- bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const;
- bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const;
- search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats);
- bool scanDocuments(const search::LidUsageStats &stats);
- void compactLidSpace(const search::LidUsageStats &stats);
- bool remove_batch_is_ongoing() const;
- bool remove_is_ongoing() const;
+ bool scanDocuments(const search::LidUsageStats &stats) override;
public:
LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
@@ -58,16 +31,7 @@ public:
const BlockableMaintenanceJobConfig &blockableConfig,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
bool nodeRetired);
- ~LidSpaceCompactionJob();
-
- // Implements IDiskMemUsageListener
- void notifyDiskMemUsage(DiskMemUsageState state) override;
-
- // Implements IClusterStateChangedNofifier
- void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
-
- // Implements IMaintenanceJob
- bool run() override;
+ ~LidSpaceCompactionJob() override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
new file mode 100644
index 00000000000..671a9c327d0
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
@@ -0,0 +1,180 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "lid_space_compaction_job_base.h"
+#include "imaintenancejobrunner.h"
+#include "i_document_scan_iterator.h"
+#include "i_lid_space_compaction_handler.h"
+#include "i_operation_storer.h"
+#include "remove_operations_rate_tracker.h"
+#include "i_disk_mem_usage_notifier.h"
+#include "iclusterstatechangednotifier.h"
+#include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h>
+#include <vespa/searchcore/proton/common/eventlogger.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/gate.h>
+#include <cassert>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".proton.server.lid_space_compaction_job");
+
+using search::DocumentMetaData;
+using search::LidUsageStats;
+
+namespace proton {
+
+bool
+LidSpaceCompactionJobBase::hasTooMuchLidBloat(const LidUsageStats &stats) const
+{
+ return (stats.getLidBloat() >= _cfg.getAllowedLidBloat() &&
+ stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor() &&
+ stats.getLidLimit() > stats.getLowestFreeLid());
+}
+
+bool
+LidSpaceCompactionJobBase::shouldRestartScanDocuments(const LidUsageStats &stats) const
+{
+ return (stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid() &&
+ stats.getLowestFreeLid() < stats.getHighestUsedLid();
+}
+
+DocumentMetaData
+LidSpaceCompactionJobBase::getNextDocument(const LidUsageStats &stats, bool retryLastDocument)
+{
+ return _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), retryLastDocument);
+}
+
+bool
+LidSpaceCompactionJobBase::scanDocumentsPost()
+{
+ if (!_scanItr->valid()) {
+ if (shouldRestartScanDocuments(_handler.getLidStatus())) {
+ _scanItr = _handler.getIterator();
+ } else {
+ _scanItr = IDocumentScanIterator::UP();
+ _shouldCompactLidSpace = true;
+ }
+ }
+ return false; // more work to do (scan documents or compact lid space)
+}
+
+void
+LidSpaceCompactionJobBase::compactLidSpace(const LidUsageStats &stats)
+{
+ uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
+ CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit);
+ vespalib::Gate gate;
+ auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared<vespalib::GateCallback>(gate));
+ gate.await();
+ _handler.handleCompactLidSpace(op, std::make_shared<vespalib::KeepAlive<decltype(commit_result)>>(std::move(commit_result)));
+ EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit);
+ _shouldCompactLidSpace = false;
+}
+
+bool
+LidSpaceCompactionJobBase::remove_batch_is_ongoing() const
+{
+ return _ops_rate_tracker->remove_batch_above_threshold();
+}
+
+bool
+LidSpaceCompactionJobBase::remove_is_ongoing() const
+{
+ return _ops_rate_tracker->remove_above_threshold();
+}
+
+LidSpaceCompactionJobBase::LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config,
+ ILidSpaceCompactionHandler &handler,
+ IOperationStorer &opStorer,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ bool nodeRetired)
+ : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(),
+ config.getDelay(), config.getInterval(), blockableConfig),
+ _cfg(config),
+ _handler(handler),
+ _opStorer(opStorer),
+ _scanItr(),
+ _diskMemUsageNotifier(diskMemUsageNotifier),
+ _clusterStateChangedNotifier(clusterStateChangedNotifier),
+ _ops_rate_tracker(std::make_shared<RemoveOperationsRateTracker>(config.get_remove_batch_block_rate(),
+ config.get_remove_block_rate())),
+ _is_disabled(false),
+ _shouldCompactLidSpace(false)
+{
+ _diskMemUsageNotifier.addDiskMemUsageListener(this);
+ _clusterStateChangedNotifier.addClusterStateChangedHandler(this);
+ if (nodeRetired) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ }
+ handler.set_operation_listener(_ops_rate_tracker);
+}
+
+LidSpaceCompactionJobBase::~LidSpaceCompactionJobBase()
+{
+ _clusterStateChangedNotifier.removeClusterStateChangedHandler(this);
+ _diskMemUsageNotifier.removeDiskMemUsageListener(this);
+}
+
+bool
+LidSpaceCompactionJobBase::run()
+{
+ if (isBlocked()) {
+ return true; // indicate work is done since no work can be done
+ }
+ LidUsageStats stats = _handler.getLidStatus();
+ if (remove_batch_is_ongoing()) {
+ // Note that we don't set the job as blocked as the decision to un-block it is not driven externally.
+ LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing",
+ _handler.getName().c_str());
+ _is_disabled = true;
+ return true;
+ }
+ if (remove_is_ongoing()) {
+ // Note that we don't set the job as blocked as the decision to un-block it is not driven externally.
+ LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing",
+ _handler.getName().c_str());
+ _is_disabled = true;
+ return true;
+ }
+ if (_is_disabled) {
+ LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing",
+ _handler.getName().c_str());
+ _is_disabled = false;
+ }
+ if (_scanItr) {
+ return scanDocuments(stats);
+ } else if (_shouldCompactLidSpace) {
+ compactLidSpace(stats);
+ } else if (hasTooMuchLidBloat(stats)) {
+ assert(!_scanItr);
+ _scanItr = _handler.getIterator();
+ return scanDocuments(stats);
+ }
+ return true;
+}
+
+void
+LidSpaceCompactionJobBase::notifyDiskMemUsage(DiskMemUsageState state)
+{
+ // Called by master write thread
+ internalNotifyDiskMemUsage(state);
+}
+
+void
+LidSpaceCompactionJobBase::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc)
+{
+ // Called by master write thread
+ bool nodeRetired = newCalc->nodeRetired();
+ if (!nodeRetired) {
+ if (isBlocked(BlockedReason::CLUSTER_STATE)) {
+ LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler.getName().c_str());
+ unBlock(BlockedReason::CLUSTER_STATE);
+ }
+ } else {
+ LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler.getName().c_str());
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ }
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
new file mode 100644
index 00000000000..bfce6d55c89
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
@@ -0,0 +1,72 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "blockable_maintenance_job.h"
+#include "document_db_maintenance_config.h"
+#include "i_disk_mem_usage_listener.h"
+#include "iclusterstatechangedhandler.h"
+#include <vespa/searchlib/common/idocumentmetastore.h>
+
+namespace proton {
+ class IDiskMemUsageNotifier;
+ class IClusterStateChangedNotifier;
+ class IOperationStorer;
+ class ILidSpaceCompactionHandler;
+ class IDocumentScanIterator;
+ class RemoveOperationsRateTracker;
+}
+
+namespace proton {
+
+/**
+ * Job that regularly checks whether lid space compaction should be performed
+ * for the given handler.
+ *
+ * Compaction is handled by moving documents from high lids to low free lids.
+ * A handler is typically working over a single document sub db.
+ */
+class LidSpaceCompactionJobBase : public BlockableMaintenanceJob,
+ public IDiskMemUsageListener,
+ public IClusterStateChangedHandler
+{
+private:
+ const DocumentDBLidSpaceCompactionConfig _cfg;
+protected:
+ ILidSpaceCompactionHandler &_handler;
+ IOperationStorer &_opStorer;
+ std::unique_ptr<IDocumentScanIterator> _scanItr;
+private:
+ IDiskMemUsageNotifier &_diskMemUsageNotifier;
+ IClusterStateChangedNotifier &_clusterStateChangedNotifier;
+ std::shared_ptr<RemoveOperationsRateTracker> _ops_rate_tracker;
+ bool _is_disabled;
+ bool _shouldCompactLidSpace;
+
+
+ bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const;
+ bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const;
+ virtual bool scanDocuments(const search::LidUsageStats &stats) = 0;
+ void compactLidSpace(const search::LidUsageStats &stats);
+ bool remove_batch_is_ongoing() const;
+ bool remove_is_ongoing() const;
+protected:
+ search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument);
+ bool scanDocumentsPost();
+public:
+ LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config,
+ ILidSpaceCompactionHandler &handler,
+ IOperationStorer &opStorer,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ bool nodeRetired);
+ ~LidSpaceCompactionJobBase() override;
+
+ void notifyDiskMemUsage(DiskMemUsageState state) override;
+ void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
+ bool run() override;
+};
+
+} // namespace proton
+
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
new file mode 100644
index 00000000000..49a782df7a3
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -0,0 +1,70 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "lid_space_compaction_job_take2.h"
+#include "i_document_scan_iterator.h"
+#include "i_lid_space_compaction_handler.h"
+#include "i_operation_storer.h"
+#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
+#include <vespa/persistence/spi/bucket_tasks.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <cassert>
+
+using search::DocumentMetaData;
+using search::LidUsageStats;
+using storage::spi::makeBucketTask;
+using storage::spi::Bucket;
+
+namespace proton::lidspace {
+
+bool
+CompactionJob::scanDocuments(const LidUsageStats &stats)
+{
+ if (_scanItr->valid()) {
+ DocumentMetaData document = getNextDocument(stats, false);
+ if (document.valid()) {
+ Bucket metaBucket(document::Bucket(document::BucketSpace::placeHolder(), document.bucketId));
+ IDestructorCallback::SP context = _moveOpsLimiter->beginOperation();
+ auto failed = _bucketExecutor.execute(metaBucket, makeBucketTask([this, meta=document, opsTracker=std::move(context)] (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) {
+ assert(bucket.getBucketId() == meta.bucketId);
+ using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>;
+ moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
+ }));
+ if (failed) return false;
+ if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
+ return true;
+ }
+ }
+ }
+ return scanDocumentsPost();
+}
+
+void
+CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> context) {
+ search::DocumentMetaData metaNow = _scanItr->getMetaData(metaThen.lid);
+ if (metaNow.lid != metaThen.lid) return;
+ if (metaNow.bucketId != metaThen.bucketId) return;
+
+ MoveOperation::UP op = _handler.createMoveOperation(metaNow, _handler.getLidStatus().getLowestFreeLid());
+ if (!op) return;
+
+ _opStorer.appendOperation(*op, context);
+ _handler.handleMove(*op, std::move(context));
+}
+
+CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
+ ILidSpaceCompactionHandler &handler,
+ IOperationStorer &opStorer,
+ BucketExecutor & bucketExecutor,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ bool nodeRetired)
+ : LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier,
+ blockableConfig, clusterStateChangedNotifier, nodeRetired),
+ _bucketExecutor(bucketExecutor)
+{
+}
+
+CompactionJob::~CompactionJob() = default;
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
new file mode 100644
index 00000000000..f79a3adc7d0
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -0,0 +1,45 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "lid_space_compaction_job_base.h"
+
+namespace storage::spi { class BucketExecutor;}
+namespace proton {
+ class IDiskMemUsageNotifier;
+ class IClusterStateChangedNotifier;
+}
+
+namespace proton::lidspace {
+
+/**
+ * Job that regularly checks whether lid space compaction should be performed
+ * for the given handler.
+ *
+ * Compaction is handled by moving documents from high lids to low free lids.
+ * A handler is typically working over a single document sub db.
+ */
+class CompactionJob : public LidSpaceCompactionJobBase
+{
+private:
+ using BucketExecutor = storage::spi::BucketExecutor;
+ using IDestructorCallback = vespalib::IDestructorCallback;
+ BucketExecutor &_bucketExecutor;
+
+ bool scanDocuments(const search::LidUsageStats &stats) override;
+ void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> onDone);
+
+public:
+ CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
+ ILidSpaceCompactionHandler &handler,
+ IOperationStorer &opStorer,
+ BucketExecutor & bucketExecutor,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ bool nodeRetired);
+ ~CompactionJob() override;
+};
+
+} // namespace proton
+