diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-19 22:09:40 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-22 11:57:59 +0000 |
commit | 9c70574815a819ba3838d6ac86abd548077002e6 (patch) | |
tree | 490a7a78e929192e6cbd2cdccbd4109409bbc581 /searchcore | |
parent | e41754597890c4611980fee95e8aec8f9b29e476 (diff) |
Make a lidspace job that uses a bucket executor to hold the bucket lock.
Diffstat (limited to 'searchcore')
11 files changed, 399 insertions, 195 deletions
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index c7239eaba16..b4c878b26eb 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -72,6 +72,9 @@ struct MyScanIterator : public IDocumentScanIterator { } return search::DocumentMetaData(); } + search::DocumentMetaData getMetaData(uint32_t lid) const override { + return search::DocumentMetaData(lid, TIMESTAMP_1, BUCKET_ID_1, GID_1); + } }; struct MyHandler : public ILidSpaceCompactionHandler { diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 73b7404ce31..26fba8a780f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -58,6 +58,8 @@ vespa_add_library(searchcore_server STATIC job_tracked_maintenance_job.cpp lid_space_compaction_handler.cpp lid_space_compaction_job.cpp + lid_space_compaction_job_base.cpp + lid_space_compaction_job_take2.cpp maintenance_controller_explorer.cpp maintenance_jobs_injector.cpp maintenancecontroller.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp index 0332303abae..35c03ef73d3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp @@ -39,4 +39,14 @@ DocumentScanIterator::next(uint32_t compactLidLimit, bool retry) return DocumentMetaData(); } +search::DocumentMetaData +DocumentScanIterator::getMetaData(uint32_t lid) const { + if (_metaStore.validLid(lid)) { + const RawDocumentMetaData &metaData = _metaStore.getRawMetaData(_lastLid); + return DocumentMetaData(_lastLid, metaData.getTimestamp(), + metaData.getBucketId(), metaData.getGid()); + } + return DocumentMetaData(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h index 62dff0a7d85..a465ca46fa4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h @@ -22,6 +22,7 @@ public: DocumentScanIterator(const IDocumentMetaStore &_metaStore); bool valid() const override; search::DocumentMetaData next(uint32_t compactLidLimit, bool retry) override; + search::DocumentMetaData getMetaData(uint32_t lid) const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h b/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h index bcc52ffb475..2e66c2bf469 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h @@ -30,6 +30,8 @@ struct IDocumentScanIterator * @param retry Whether we should start the scan with the previous returned document. */ virtual search::DocumentMetaData next(uint32_t compactLidLimit, bool retry) = 0; + + virtual search::DocumentMetaData getMetaData(uint32_t lid) const = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index 228c1e4355e..c752b7950c3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -1,20 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "i_disk_mem_usage_notifier.h" -#include "iclusterstatechangednotifier.h" -#include "imaintenancejobrunner.h" #include "lid_space_compaction_job.h" #include "i_document_scan_iterator.h" #include "ifrozenbuckethandler.h" +#include "i_lid_space_compaction_handler.h" +#include "i_operation_storer.h" #include <vespa/searchcore/proton/feedoperation/moveoperation.h> -#include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h> -#include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/vespalib/util/destructor_callbacks.h> -#include <vespa/vespalib/util/gate.h> -#include <cassert> - -#include <vespa/log/log.h> -LOG_SETUP(".proton.server.lid_space_compaction_job"); using search::DocumentMetaData; using search::LidUsageStats; @@ -22,33 +13,11 @@ using search::LidUsageStats; namespace proton { bool -LidSpaceCompactionJob::hasTooMuchLidBloat(const LidUsageStats &stats) const -{ - return (stats.getLidBloat() >= _cfg.getAllowedLidBloat() && - stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor() && - stats.getLidLimit() > stats.getLowestFreeLid()); -} - -bool -LidSpaceCompactionJob::shouldRestartScanDocuments(const LidUsageStats &stats) const -{ - return (stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid() && - stats.getLowestFreeLid() < stats.getHighestUsedLid(); -} - -DocumentMetaData -LidSpaceCompactionJob::getNextDocument(const LidUsageStats &stats) -{ - DocumentMetaData document = _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), _retryFrozenDocument); - _retryFrozenDocument = false; - return document; -} - -bool LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) { if (_scanItr->valid()) { - DocumentMetaData document = getNextDocument(stats); + DocumentMetaData document = getNextDocument(stats, _retryFrozenDocument); + _retryFrozenDocument = false; if (document.valid()) { IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(document.bucketId); if ( ! bucketGuard ) { @@ -57,7 +26,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) _retryFrozenDocument = true; return true; } else { - MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); + std::unique_ptr<MoveOperation> op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); if ( ! op ) { return false; } @@ -70,40 +39,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) } } } - if (!_scanItr->valid()){ - if (shouldRestartScanDocuments(_handler.getLidStatus())) { - _scanItr = _handler.getIterator(); - } else { - _scanItr = IDocumentScanIterator::UP(); - _shouldCompactLidSpace = true; - } - } - return false; // more work to do (scan documents or compact lid space) -} - -void -LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) -{ - uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; - CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); - vespalib::Gate gate; - auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared<vespalib::GateCallback>(gate)); - gate.await(); - _handler.handleCompactLidSpace(op, std::make_shared<vespalib::KeepAlive<decltype(commit_result)>>(std::move(commit_result))); - EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); - _shouldCompactLidSpace = false; -} - -bool -LidSpaceCompactionJob::remove_batch_is_ongoing() const -{ - return _ops_rate_tracker->remove_batch_above_threshold(); -} - -bool -LidSpaceCompactionJob::remove_is_ongoing() const -{ - return _ops_rate_tracker->remove_above_threshold(); + return scanDocumentsPost(); } LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionConfig &config, @@ -114,94 +50,13 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC const BlockableMaintenanceJobConfig &blockableConfig, IClusterStateChangedNotifier &clusterStateChangedNotifier, bool nodeRetired) - : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(), - config.getDelay(), config.getInterval(), blockableConfig), - _cfg(config), - _handler(handler), - _opStorer(opStorer), + : LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier, + blockableConfig, clusterStateChangedNotifier, nodeRetired), _frozenHandler(frozenHandler), - _scanItr(), - _retryFrozenDocument(false), - _shouldCompactLidSpace(false), - _diskMemUsageNotifier(diskMemUsageNotifier), - _clusterStateChangedNotifier(clusterStateChangedNotifier), - _ops_rate_tracker(std::make_shared<RemoveOperationsRateTracker>(config.get_remove_batch_block_rate(), - config.get_remove_block_rate())), - _is_disabled(false) -{ - _diskMemUsageNotifier.addDiskMemUsageListener(this); - _clusterStateChangedNotifier.addClusterStateChangedHandler(this); - if (nodeRetired) { - setBlocked(BlockedReason::CLUSTER_STATE); - } - handler.set_operation_listener(_ops_rate_tracker); -} - -LidSpaceCompactionJob::~LidSpaceCompactionJob() -{ - _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); - _diskMemUsageNotifier.removeDiskMemUsageListener(this); -} - -bool -LidSpaceCompactionJob::run() + _retryFrozenDocument(false) { - if (isBlocked()) { - return true; // indicate work is done since no work can be done - } - LidUsageStats stats = _handler.getLidStatus(); - if (remove_batch_is_ongoing()) { - // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. - LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing", - _handler.getName().c_str()); - _is_disabled = true; - return true; - } - if (remove_is_ongoing()) { - // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. - LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing", - _handler.getName().c_str()); - _is_disabled = true; - return true; - } - if (_is_disabled) { - LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing", - _handler.getName().c_str()); - _is_disabled = false; - } - if (_scanItr) { - return scanDocuments(stats); - } else if (_shouldCompactLidSpace) { - compactLidSpace(stats); - } else if (hasTooMuchLidBloat(stats)) { - assert(!_scanItr); - _scanItr = _handler.getIterator(); - return scanDocuments(stats); - } - return true; } -void -LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state) -{ - // Called by master write thread - internalNotifyDiskMemUsage(state); -} +LidSpaceCompactionJob::~LidSpaceCompactionJob() = default; -void -LidSpaceCompactionJob::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) -{ - // Called by master write thread - bool nodeRetired = newCalc->nodeRetired(); - if (!nodeRetired) { - if (isBlocked(BlockedReason::CLUSTER_STATE)) { - LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler.getName().c_str()); - unBlock(BlockedReason::CLUSTER_STATE); - } - } else { - LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler.getName().c_str()); - setBlocked(BlockedReason::CLUSTER_STATE); - } } - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index e9d5de58323..93c4ccfbc20 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -1,21 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "blockable_maintenance_job.h" -#include "document_db_maintenance_config.h" -#include "i_disk_mem_usage_listener.h" -#include "i_lid_space_compaction_handler.h" -#include "i_operation_storer.h" -#include "ibucketstatecalculator.h" -#include "iclusterstatechangedhandler.h" -#include "iclusterstatechangednotifier.h" -#include "remove_operations_rate_tracker.h" +#include "lid_space_compaction_job_base.h" namespace proton { class IFrozenBucketHandler; -class IDiskMemUsageNotifier; -class IClusterStateChangedNotifier; /** * Job that regularly checks whether lid space compaction should be performed @@ -24,30 +14,13 @@ class IClusterStateChangedNotifier; * Compaction is handled by moving documents from high lids to low free lids. * A handler is typically working over a single document sub db. */ -class LidSpaceCompactionJob : public BlockableMaintenanceJob, - public IDiskMemUsageListener, - public IClusterStateChangedHandler +class LidSpaceCompactionJob : public LidSpaceCompactionJobBase { private: - const DocumentDBLidSpaceCompactionConfig _cfg; - ILidSpaceCompactionHandler &_handler; - IOperationStorer &_opStorer; IFrozenBucketHandler &_frozenHandler; - std::unique_ptr<IDocumentScanIterator> _scanItr; bool _retryFrozenDocument; - bool _shouldCompactLidSpace; - IDiskMemUsageNotifier &_diskMemUsageNotifier; - IClusterStateChangedNotifier &_clusterStateChangedNotifier; - std::shared_ptr<RemoveOperationsRateTracker> _ops_rate_tracker; - bool _is_disabled; - bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; - bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; - search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats); - bool scanDocuments(const search::LidUsageStats &stats); - void compactLidSpace(const search::LidUsageStats &stats); - bool remove_batch_is_ongoing() const; - bool remove_is_ongoing() const; + bool scanDocuments(const search::LidUsageStats &stats) override; public: LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionConfig &config, @@ -58,16 +31,7 @@ public: const BlockableMaintenanceJobConfig &blockableConfig, IClusterStateChangedNotifier &clusterStateChangedNotifier, bool nodeRetired); - ~LidSpaceCompactionJob(); - - // Implements IDiskMemUsageListener - void notifyDiskMemUsage(DiskMemUsageState state) override; - - // Implements IClusterStateChangedNofifier - void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; - - // Implements IMaintenanceJob - bool run() override; + ~LidSpaceCompactionJob() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp new file mode 100644 index 00000000000..671a9c327d0 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp @@ -0,0 +1,180 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "lid_space_compaction_job_base.h" +#include "imaintenancejobrunner.h" +#include "i_document_scan_iterator.h" +#include "i_lid_space_compaction_handler.h" +#include "i_operation_storer.h" +#include "remove_operations_rate_tracker.h" +#include "i_disk_mem_usage_notifier.h" +#include "iclusterstatechangednotifier.h" +#include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h> +#include <vespa/searchcore/proton/common/eventlogger.h> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/gate.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".proton.server.lid_space_compaction_job"); + +using search::DocumentMetaData; +using search::LidUsageStats; + +namespace proton { + +bool +LidSpaceCompactionJobBase::hasTooMuchLidBloat(const LidUsageStats &stats) const +{ + return (stats.getLidBloat() >= _cfg.getAllowedLidBloat() && + stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor() && + stats.getLidLimit() > stats.getLowestFreeLid()); +} + +bool +LidSpaceCompactionJobBase::shouldRestartScanDocuments(const LidUsageStats &stats) const +{ + return (stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid() && + stats.getLowestFreeLid() < stats.getHighestUsedLid(); +} + +DocumentMetaData +LidSpaceCompactionJobBase::getNextDocument(const LidUsageStats &stats, bool retryLastDocument) +{ + return _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), retryLastDocument); +} + +bool +LidSpaceCompactionJobBase::scanDocumentsPost() +{ + if (!_scanItr->valid()) { + if (shouldRestartScanDocuments(_handler.getLidStatus())) { + _scanItr = _handler.getIterator(); + } else { + _scanItr = IDocumentScanIterator::UP(); + _shouldCompactLidSpace = true; + } + } + return false; // more work to do (scan documents or compact lid space) +} + +void +LidSpaceCompactionJobBase::compactLidSpace(const LidUsageStats &stats) +{ + uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; + CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); + vespalib::Gate gate; + auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared<vespalib::GateCallback>(gate)); + gate.await(); + _handler.handleCompactLidSpace(op, std::make_shared<vespalib::KeepAlive<decltype(commit_result)>>(std::move(commit_result))); + EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); + _shouldCompactLidSpace = false; +} + +bool +LidSpaceCompactionJobBase::remove_batch_is_ongoing() const +{ + return _ops_rate_tracker->remove_batch_above_threshold(); +} + +bool +LidSpaceCompactionJobBase::remove_is_ongoing() const +{ + return _ops_rate_tracker->remove_above_threshold(); +} + +LidSpaceCompactionJobBase::LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired) + : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(), + config.getDelay(), config.getInterval(), blockableConfig), + _cfg(config), + _handler(handler), + _opStorer(opStorer), + _scanItr(), + _diskMemUsageNotifier(diskMemUsageNotifier), + _clusterStateChangedNotifier(clusterStateChangedNotifier), + _ops_rate_tracker(std::make_shared<RemoveOperationsRateTracker>(config.get_remove_batch_block_rate(), + config.get_remove_block_rate())), + _is_disabled(false), + _shouldCompactLidSpace(false) +{ + _diskMemUsageNotifier.addDiskMemUsageListener(this); + _clusterStateChangedNotifier.addClusterStateChangedHandler(this); + if (nodeRetired) { + setBlocked(BlockedReason::CLUSTER_STATE); + } + handler.set_operation_listener(_ops_rate_tracker); +} + +LidSpaceCompactionJobBase::~LidSpaceCompactionJobBase() +{ + _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); + _diskMemUsageNotifier.removeDiskMemUsageListener(this); +} + +bool +LidSpaceCompactionJobBase::run() +{ + if (isBlocked()) { + return true; // indicate work is done since no work can be done + } + LidUsageStats stats = _handler.getLidStatus(); + if (remove_batch_is_ongoing()) { + // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. + LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing", + _handler.getName().c_str()); + _is_disabled = true; + return true; + } + if (remove_is_ongoing()) { + // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. + LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing", + _handler.getName().c_str()); + _is_disabled = true; + return true; + } + if (_is_disabled) { + LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing", + _handler.getName().c_str()); + _is_disabled = false; + } + if (_scanItr) { + return scanDocuments(stats); + } else if (_shouldCompactLidSpace) { + compactLidSpace(stats); + } else if (hasTooMuchLidBloat(stats)) { + assert(!_scanItr); + _scanItr = _handler.getIterator(); + return scanDocuments(stats); + } + return true; +} + +void +LidSpaceCompactionJobBase::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + internalNotifyDiskMemUsage(state); +} + +void +LidSpaceCompactionJobBase::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) +{ + // Called by master write thread + bool nodeRetired = newCalc->nodeRetired(); + if (!nodeRetired) { + if (isBlocked(BlockedReason::CLUSTER_STATE)) { + LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler.getName().c_str()); + unBlock(BlockedReason::CLUSTER_STATE); + } + } else { + LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler.getName().c_str()); + setBlocked(BlockedReason::CLUSTER_STATE); + } +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h new file mode 100644 index 00000000000..bfce6d55c89 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h @@ -0,0 +1,72 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "blockable_maintenance_job.h" +#include "document_db_maintenance_config.h" +#include "i_disk_mem_usage_listener.h" +#include "iclusterstatechangedhandler.h" +#include <vespa/searchlib/common/idocumentmetastore.h> + +namespace proton { + class IDiskMemUsageNotifier; + class IClusterStateChangedNotifier; + class IOperationStorer; + class ILidSpaceCompactionHandler; + class IDocumentScanIterator; + class RemoveOperationsRateTracker; +} + +namespace proton { + +/** + * Job that regularly checks whether lid space compaction should be performed + * for the given handler. + * + * Compaction is handled by moving documents from high lids to low free lids. + * A handler is typically working over a single document sub db. + */ +class LidSpaceCompactionJobBase : public BlockableMaintenanceJob, + public IDiskMemUsageListener, + public IClusterStateChangedHandler +{ +private: + const DocumentDBLidSpaceCompactionConfig _cfg; +protected: + ILidSpaceCompactionHandler &_handler; + IOperationStorer &_opStorer; + std::unique_ptr<IDocumentScanIterator> _scanItr; +private: + IDiskMemUsageNotifier &_diskMemUsageNotifier; + IClusterStateChangedNotifier &_clusterStateChangedNotifier; + std::shared_ptr<RemoveOperationsRateTracker> _ops_rate_tracker; + bool _is_disabled; + bool _shouldCompactLidSpace; + + + bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; + bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; + virtual bool scanDocuments(const search::LidUsageStats &stats) = 0; + void compactLidSpace(const search::LidUsageStats &stats); + bool remove_batch_is_ongoing() const; + bool remove_is_ongoing() const; +protected: + search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument); + bool scanDocumentsPost(); +public: + LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired); + ~LidSpaceCompactionJobBase() override; + + void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + bool run() override; +}; + +} // namespace proton + diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp new file mode 100644 index 00000000000..49a782df7a3 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -0,0 +1,70 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "lid_space_compaction_job_take2.h" +#include "i_document_scan_iterator.h" +#include "i_lid_space_compaction_handler.h" +#include "i_operation_storer.h" +#include <vespa/searchcore/proton/feedoperation/moveoperation.h> +#include <vespa/persistence/spi/bucket_tasks.h> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <cassert> + +using search::DocumentMetaData; +using search::LidUsageStats; +using storage::spi::makeBucketTask; +using storage::spi::Bucket; + +namespace proton::lidspace { + +bool +CompactionJob::scanDocuments(const LidUsageStats &stats) +{ + if (_scanItr->valid()) { + DocumentMetaData document = getNextDocument(stats, false); + if (document.valid()) { + Bucket metaBucket(document::Bucket(document::BucketSpace::placeHolder(), document.bucketId)); + IDestructorCallback::SP context = _moveOpsLimiter->beginOperation(); + auto failed = _bucketExecutor.execute(metaBucket, makeBucketTask([this, meta=document, opsTracker=std::move(context)] (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) { + assert(bucket.getBucketId() == meta.bucketId); + using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>; + moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone)))); + })); + if (failed) return false; + if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { + return true; + } + } + } + return scanDocumentsPost(); +} + +void +CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> context) { + search::DocumentMetaData metaNow = _scanItr->getMetaData(metaThen.lid); + if (metaNow.lid != metaThen.lid) return; + if (metaNow.bucketId != metaThen.bucketId) return; + + MoveOperation::UP op = _handler.createMoveOperation(metaNow, _handler.getLidStatus().getLowestFreeLid()); + if (!op) return; + + _opStorer.appendOperation(*op, context); + _handler.handleMove(*op, std::move(context)); +} + +CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + BucketExecutor & bucketExecutor, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired) + : LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier, + blockableConfig, clusterStateChangedNotifier, nodeRetired), + _bucketExecutor(bucketExecutor) +{ +} + +CompactionJob::~CompactionJob() = default; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h new file mode 100644 index 00000000000..f79a3adc7d0 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -0,0 +1,45 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "lid_space_compaction_job_base.h" + +namespace storage::spi { class BucketExecutor;} +namespace proton { + class IDiskMemUsageNotifier; + class IClusterStateChangedNotifier; +} + +namespace proton::lidspace { + +/** + * Job that regularly checks whether lid space compaction should be performed + * for the given handler. + * + * Compaction is handled by moving documents from high lids to low free lids. + * A handler is typically working over a single document sub db. + */ +class CompactionJob : public LidSpaceCompactionJobBase +{ +private: + using BucketExecutor = storage::spi::BucketExecutor; + using IDestructorCallback = vespalib::IDestructorCallback; + BucketExecutor &_bucketExecutor; + + bool scanDocuments(const search::LidUsageStats &stats) override; + void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> onDone); + +public: + CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + BucketExecutor & bucketExecutor, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired); + ~CompactionJob() override; +}; + +} // namespace proton + |