From 9c70574815a819ba3838d6ac86abd548077002e6 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 19 Jan 2021 22:09:40 +0000 Subject: Make a lidspace job that uses a bucket executor to hold the bucket lock. --- .../lid_space_compaction_test.cpp | 3 + .../vespa/searchcore/proton/server/CMakeLists.txt | 2 + .../proton/server/document_scan_iterator.cpp | 10 ++ .../proton/server/document_scan_iterator.h | 1 + .../proton/server/i_document_scan_iterator.h | 2 + .../proton/server/lid_space_compaction_job.cpp | 165 ++----------------- .../proton/server/lid_space_compaction_job.h | 44 +---- .../server/lid_space_compaction_job_base.cpp | 180 +++++++++++++++++++++ .../proton/server/lid_space_compaction_job_base.h | 72 +++++++++ .../server/lid_space_compaction_job_take2.cpp | 70 ++++++++ .../proton/server/lid_space_compaction_job_take2.h | 45 ++++++ 11 files changed, 399 insertions(+), 195 deletions(-) create mode 100644 searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h create mode 100644 searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h (limited to 'searchcore') diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index c7239eaba16..b4c878b26eb 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -72,6 +72,9 @@ struct MyScanIterator : public IDocumentScanIterator { } return search::DocumentMetaData(); } + search::DocumentMetaData getMetaData(uint32_t lid) const override { + return search::DocumentMetaData(lid, TIMESTAMP_1, BUCKET_ID_1, GID_1); + } }; struct MyHandler : public ILidSpaceCompactionHandler { diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 73b7404ce31..26fba8a780f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -58,6 +58,8 @@ vespa_add_library(searchcore_server STATIC job_tracked_maintenance_job.cpp lid_space_compaction_handler.cpp lid_space_compaction_job.cpp + lid_space_compaction_job_base.cpp + lid_space_compaction_job_take2.cpp maintenance_controller_explorer.cpp maintenance_jobs_injector.cpp maintenancecontroller.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp index 0332303abae..35c03ef73d3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.cpp @@ -39,4 +39,14 @@ DocumentScanIterator::next(uint32_t compactLidLimit, bool retry) return DocumentMetaData(); } +search::DocumentMetaData +DocumentScanIterator::getMetaData(uint32_t lid) const { + if (_metaStore.validLid(lid)) { + const RawDocumentMetaData &metaData = _metaStore.getRawMetaData(_lastLid); + return DocumentMetaData(_lastLid, metaData.getTimestamp(), + metaData.getBucketId(), metaData.getGid()); + } + return DocumentMetaData(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h index 62dff0a7d85..a465ca46fa4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_scan_iterator.h @@ -22,6 +22,7 @@ public: DocumentScanIterator(const IDocumentMetaStore &_metaStore); bool valid() const override; search::DocumentMetaData next(uint32_t compactLidLimit, bool retry) override; + search::DocumentMetaData getMetaData(uint32_t lid) const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h b/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h index bcc52ffb475..2e66c2bf469 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_document_scan_iterator.h @@ -30,6 +30,8 @@ struct IDocumentScanIterator * @param retry Whether we should start the scan with the previous returned document. */ virtual search::DocumentMetaData next(uint32_t compactLidLimit, bool retry) = 0; + + virtual search::DocumentMetaData getMetaData(uint32_t lid) const = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index 228c1e4355e..c752b7950c3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -1,54 +1,23 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "i_disk_mem_usage_notifier.h" -#include "iclusterstatechangednotifier.h" -#include "imaintenancejobrunner.h" #include "lid_space_compaction_job.h" #include "i_document_scan_iterator.h" #include "ifrozenbuckethandler.h" +#include "i_lid_space_compaction_handler.h" +#include "i_operation_storer.h" #include -#include -#include -#include -#include -#include - -#include -LOG_SETUP(".proton.server.lid_space_compaction_job"); using search::DocumentMetaData; using search::LidUsageStats; namespace proton { -bool -LidSpaceCompactionJob::hasTooMuchLidBloat(const LidUsageStats &stats) const -{ - return (stats.getLidBloat() >= _cfg.getAllowedLidBloat() && - stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor() && - stats.getLidLimit() > stats.getLowestFreeLid()); -} - -bool -LidSpaceCompactionJob::shouldRestartScanDocuments(const LidUsageStats &stats) const -{ - return (stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid() && - stats.getLowestFreeLid() < stats.getHighestUsedLid(); -} - -DocumentMetaData -LidSpaceCompactionJob::getNextDocument(const LidUsageStats &stats) -{ - DocumentMetaData document = _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), _retryFrozenDocument); - _retryFrozenDocument = false; - return document; -} - bool LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) { if (_scanItr->valid()) { - DocumentMetaData document = getNextDocument(stats); + DocumentMetaData document = getNextDocument(stats, _retryFrozenDocument); + _retryFrozenDocument = false; if (document.valid()) { IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(document.bucketId); if ( ! bucketGuard ) { @@ -57,7 +26,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) _retryFrozenDocument = true; return true; } else { - MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); + std::unique_ptr op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); if ( ! op ) { return false; } @@ -70,40 +39,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) } } } - if (!_scanItr->valid()){ - if (shouldRestartScanDocuments(_handler.getLidStatus())) { - _scanItr = _handler.getIterator(); - } else { - _scanItr = IDocumentScanIterator::UP(); - _shouldCompactLidSpace = true; - } - } - return false; // more work to do (scan documents or compact lid space) -} - -void -LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) -{ - uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; - CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); - vespalib::Gate gate; - auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared(gate)); - gate.await(); - _handler.handleCompactLidSpace(op, std::make_shared>(std::move(commit_result))); - EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); - _shouldCompactLidSpace = false; -} - -bool -LidSpaceCompactionJob::remove_batch_is_ongoing() const -{ - return _ops_rate_tracker->remove_batch_above_threshold(); -} - -bool -LidSpaceCompactionJob::remove_is_ongoing() const -{ - return _ops_rate_tracker->remove_above_threshold(); + return scanDocumentsPost(); } LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionConfig &config, @@ -114,94 +50,13 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC const BlockableMaintenanceJobConfig &blockableConfig, IClusterStateChangedNotifier &clusterStateChangedNotifier, bool nodeRetired) - : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(), - config.getDelay(), config.getInterval(), blockableConfig), - _cfg(config), - _handler(handler), - _opStorer(opStorer), + : LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier, + blockableConfig, clusterStateChangedNotifier, nodeRetired), _frozenHandler(frozenHandler), - _scanItr(), - _retryFrozenDocument(false), - _shouldCompactLidSpace(false), - _diskMemUsageNotifier(diskMemUsageNotifier), - _clusterStateChangedNotifier(clusterStateChangedNotifier), - _ops_rate_tracker(std::make_shared(config.get_remove_batch_block_rate(), - config.get_remove_block_rate())), - _is_disabled(false) -{ - _diskMemUsageNotifier.addDiskMemUsageListener(this); - _clusterStateChangedNotifier.addClusterStateChangedHandler(this); - if (nodeRetired) { - setBlocked(BlockedReason::CLUSTER_STATE); - } - handler.set_operation_listener(_ops_rate_tracker); -} - -LidSpaceCompactionJob::~LidSpaceCompactionJob() -{ - _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); - _diskMemUsageNotifier.removeDiskMemUsageListener(this); -} - -bool -LidSpaceCompactionJob::run() + _retryFrozenDocument(false) { - if (isBlocked()) { - return true; // indicate work is done since no work can be done - } - LidUsageStats stats = _handler.getLidStatus(); - if (remove_batch_is_ongoing()) { - // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. - LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing", - _handler.getName().c_str()); - _is_disabled = true; - return true; - } - if (remove_is_ongoing()) { - // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. - LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing", - _handler.getName().c_str()); - _is_disabled = true; - return true; - } - if (_is_disabled) { - LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing", - _handler.getName().c_str()); - _is_disabled = false; - } - if (_scanItr) { - return scanDocuments(stats); - } else if (_shouldCompactLidSpace) { - compactLidSpace(stats); - } else if (hasTooMuchLidBloat(stats)) { - assert(!_scanItr); - _scanItr = _handler.getIterator(); - return scanDocuments(stats); - } - return true; } -void -LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state) -{ - // Called by master write thread - internalNotifyDiskMemUsage(state); -} +LidSpaceCompactionJob::~LidSpaceCompactionJob() = default; -void -LidSpaceCompactionJob::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) -{ - // Called by master write thread - bool nodeRetired = newCalc->nodeRetired(); - if (!nodeRetired) { - if (isBlocked(BlockedReason::CLUSTER_STATE)) { - LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler.getName().c_str()); - unBlock(BlockedReason::CLUSTER_STATE); - } - } else { - LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler.getName().c_str()); - setBlocked(BlockedReason::CLUSTER_STATE); - } } - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index e9d5de58323..93c4ccfbc20 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -1,21 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "blockable_maintenance_job.h" -#include "document_db_maintenance_config.h" -#include "i_disk_mem_usage_listener.h" -#include "i_lid_space_compaction_handler.h" -#include "i_operation_storer.h" -#include "ibucketstatecalculator.h" -#include "iclusterstatechangedhandler.h" -#include "iclusterstatechangednotifier.h" -#include "remove_operations_rate_tracker.h" +#include "lid_space_compaction_job_base.h" namespace proton { class IFrozenBucketHandler; -class IDiskMemUsageNotifier; -class IClusterStateChangedNotifier; /** * Job that regularly checks whether lid space compaction should be performed @@ -24,30 +14,13 @@ class IClusterStateChangedNotifier; * Compaction is handled by moving documents from high lids to low free lids. * A handler is typically working over a single document sub db. */ -class LidSpaceCompactionJob : public BlockableMaintenanceJob, - public IDiskMemUsageListener, - public IClusterStateChangedHandler +class LidSpaceCompactionJob : public LidSpaceCompactionJobBase { private: - const DocumentDBLidSpaceCompactionConfig _cfg; - ILidSpaceCompactionHandler &_handler; - IOperationStorer &_opStorer; IFrozenBucketHandler &_frozenHandler; - std::unique_ptr _scanItr; bool _retryFrozenDocument; - bool _shouldCompactLidSpace; - IDiskMemUsageNotifier &_diskMemUsageNotifier; - IClusterStateChangedNotifier &_clusterStateChangedNotifier; - std::shared_ptr _ops_rate_tracker; - bool _is_disabled; - bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; - bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; - search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats); - bool scanDocuments(const search::LidUsageStats &stats); - void compactLidSpace(const search::LidUsageStats &stats); - bool remove_batch_is_ongoing() const; - bool remove_is_ongoing() const; + bool scanDocuments(const search::LidUsageStats &stats) override; public: LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionConfig &config, @@ -58,16 +31,7 @@ public: const BlockableMaintenanceJobConfig &blockableConfig, IClusterStateChangedNotifier &clusterStateChangedNotifier, bool nodeRetired); - ~LidSpaceCompactionJob(); - - // Implements IDiskMemUsageListener - void notifyDiskMemUsage(DiskMemUsageState state) override; - - // Implements IClusterStateChangedNofifier - void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; - - // Implements IMaintenanceJob - bool run() override; + ~LidSpaceCompactionJob() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp new file mode 100644 index 00000000000..671a9c327d0 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp @@ -0,0 +1,180 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "lid_space_compaction_job_base.h" +#include "imaintenancejobrunner.h" +#include "i_document_scan_iterator.h" +#include "i_lid_space_compaction_handler.h" +#include "i_operation_storer.h" +#include "remove_operations_rate_tracker.h" +#include "i_disk_mem_usage_notifier.h" +#include "iclusterstatechangednotifier.h" +#include +#include +#include +#include +#include + +#include +LOG_SETUP(".proton.server.lid_space_compaction_job"); + +using search::DocumentMetaData; +using search::LidUsageStats; + +namespace proton { + +bool +LidSpaceCompactionJobBase::hasTooMuchLidBloat(const LidUsageStats &stats) const +{ + return (stats.getLidBloat() >= _cfg.getAllowedLidBloat() && + stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor() && + stats.getLidLimit() > stats.getLowestFreeLid()); +} + +bool +LidSpaceCompactionJobBase::shouldRestartScanDocuments(const LidUsageStats &stats) const +{ + return (stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid() && + stats.getLowestFreeLid() < stats.getHighestUsedLid(); +} + +DocumentMetaData +LidSpaceCompactionJobBase::getNextDocument(const LidUsageStats &stats, bool retryLastDocument) +{ + return _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), retryLastDocument); +} + +bool +LidSpaceCompactionJobBase::scanDocumentsPost() +{ + if (!_scanItr->valid()) { + if (shouldRestartScanDocuments(_handler.getLidStatus())) { + _scanItr = _handler.getIterator(); + } else { + _scanItr = IDocumentScanIterator::UP(); + _shouldCompactLidSpace = true; + } + } + return false; // more work to do (scan documents or compact lid space) +} + +void +LidSpaceCompactionJobBase::compactLidSpace(const LidUsageStats &stats) +{ + uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; + CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); + vespalib::Gate gate; + auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared(gate)); + gate.await(); + _handler.handleCompactLidSpace(op, std::make_shared>(std::move(commit_result))); + EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); + _shouldCompactLidSpace = false; +} + +bool +LidSpaceCompactionJobBase::remove_batch_is_ongoing() const +{ + return _ops_rate_tracker->remove_batch_above_threshold(); +} + +bool +LidSpaceCompactionJobBase::remove_is_ongoing() const +{ + return _ops_rate_tracker->remove_above_threshold(); +} + +LidSpaceCompactionJobBase::LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired) + : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(), + config.getDelay(), config.getInterval(), blockableConfig), + _cfg(config), + _handler(handler), + _opStorer(opStorer), + _scanItr(), + _diskMemUsageNotifier(diskMemUsageNotifier), + _clusterStateChangedNotifier(clusterStateChangedNotifier), + _ops_rate_tracker(std::make_shared(config.get_remove_batch_block_rate(), + config.get_remove_block_rate())), + _is_disabled(false), + _shouldCompactLidSpace(false) +{ + _diskMemUsageNotifier.addDiskMemUsageListener(this); + _clusterStateChangedNotifier.addClusterStateChangedHandler(this); + if (nodeRetired) { + setBlocked(BlockedReason::CLUSTER_STATE); + } + handler.set_operation_listener(_ops_rate_tracker); +} + +LidSpaceCompactionJobBase::~LidSpaceCompactionJobBase() +{ + _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); + _diskMemUsageNotifier.removeDiskMemUsageListener(this); +} + +bool +LidSpaceCompactionJobBase::run() +{ + if (isBlocked()) { + return true; // indicate work is done since no work can be done + } + LidUsageStats stats = _handler.getLidStatus(); + if (remove_batch_is_ongoing()) { + // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. + LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing", + _handler.getName().c_str()); + _is_disabled = true; + return true; + } + if (remove_is_ongoing()) { + // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. + LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing", + _handler.getName().c_str()); + _is_disabled = true; + return true; + } + if (_is_disabled) { + LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing", + _handler.getName().c_str()); + _is_disabled = false; + } + if (_scanItr) { + return scanDocuments(stats); + } else if (_shouldCompactLidSpace) { + compactLidSpace(stats); + } else if (hasTooMuchLidBloat(stats)) { + assert(!_scanItr); + _scanItr = _handler.getIterator(); + return scanDocuments(stats); + } + return true; +} + +void +LidSpaceCompactionJobBase::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + internalNotifyDiskMemUsage(state); +} + +void +LidSpaceCompactionJobBase::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) +{ + // Called by master write thread + bool nodeRetired = newCalc->nodeRetired(); + if (!nodeRetired) { + if (isBlocked(BlockedReason::CLUSTER_STATE)) { + LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler.getName().c_str()); + unBlock(BlockedReason::CLUSTER_STATE); + } + } else { + LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler.getName().c_str()); + setBlocked(BlockedReason::CLUSTER_STATE); + } +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h new file mode 100644 index 00000000000..bfce6d55c89 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h @@ -0,0 +1,72 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "blockable_maintenance_job.h" +#include "document_db_maintenance_config.h" +#include "i_disk_mem_usage_listener.h" +#include "iclusterstatechangedhandler.h" +#include + +namespace proton { + class IDiskMemUsageNotifier; + class IClusterStateChangedNotifier; + class IOperationStorer; + class ILidSpaceCompactionHandler; + class IDocumentScanIterator; + class RemoveOperationsRateTracker; +} + +namespace proton { + +/** + * Job that regularly checks whether lid space compaction should be performed + * for the given handler. + * + * Compaction is handled by moving documents from high lids to low free lids. + * A handler is typically working over a single document sub db. + */ +class LidSpaceCompactionJobBase : public BlockableMaintenanceJob, + public IDiskMemUsageListener, + public IClusterStateChangedHandler +{ +private: + const DocumentDBLidSpaceCompactionConfig _cfg; +protected: + ILidSpaceCompactionHandler &_handler; + IOperationStorer &_opStorer; + std::unique_ptr _scanItr; +private: + IDiskMemUsageNotifier &_diskMemUsageNotifier; + IClusterStateChangedNotifier &_clusterStateChangedNotifier; + std::shared_ptr _ops_rate_tracker; + bool _is_disabled; + bool _shouldCompactLidSpace; + + + bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; + bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; + virtual bool scanDocuments(const search::LidUsageStats &stats) = 0; + void compactLidSpace(const search::LidUsageStats &stats); + bool remove_batch_is_ongoing() const; + bool remove_is_ongoing() const; +protected: + search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument); + bool scanDocumentsPost(); +public: + LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired); + ~LidSpaceCompactionJobBase() override; + + void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + bool run() override; +}; + +} // namespace proton + diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp new file mode 100644 index 00000000000..49a782df7a3 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -0,0 +1,70 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "lid_space_compaction_job_take2.h" +#include "i_document_scan_iterator.h" +#include "i_lid_space_compaction_handler.h" +#include "i_operation_storer.h" +#include +#include +#include +#include + +using search::DocumentMetaData; +using search::LidUsageStats; +using storage::spi::makeBucketTask; +using storage::spi::Bucket; + +namespace proton::lidspace { + +bool +CompactionJob::scanDocuments(const LidUsageStats &stats) +{ + if (_scanItr->valid()) { + DocumentMetaData document = getNextDocument(stats, false); + if (document.valid()) { + Bucket metaBucket(document::Bucket(document::BucketSpace::placeHolder(), document.bucketId)); + IDestructorCallback::SP context = _moveOpsLimiter->beginOperation(); + auto failed = _bucketExecutor.execute(metaBucket, makeBucketTask([this, meta=document, opsTracker=std::move(context)] (const Bucket & bucket, std::shared_ptr onDone) { + assert(bucket.getBucketId() == meta.bucketId); + using DoneContext = vespalib::KeepAlive>; + moveDocument(meta, std::make_shared(std::make_pair(std::move(opsTracker), std::move(onDone)))); + })); + if (failed) return false; + if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { + return true; + } + } + } + return scanDocumentsPost(); +} + +void +CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr context) { + search::DocumentMetaData metaNow = _scanItr->getMetaData(metaThen.lid); + if (metaNow.lid != metaThen.lid) return; + if (metaNow.bucketId != metaThen.bucketId) return; + + MoveOperation::UP op = _handler.createMoveOperation(metaNow, _handler.getLidStatus().getLowestFreeLid()); + if (!op) return; + + _opStorer.appendOperation(*op, context); + _handler.handleMove(*op, std::move(context)); +} + +CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + BucketExecutor & bucketExecutor, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired) + : LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier, + blockableConfig, clusterStateChangedNotifier, nodeRetired), + _bucketExecutor(bucketExecutor) +{ +} + +CompactionJob::~CompactionJob() = default; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h new file mode 100644 index 00000000000..f79a3adc7d0 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -0,0 +1,45 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "lid_space_compaction_job_base.h" + +namespace storage::spi { class BucketExecutor;} +namespace proton { + class IDiskMemUsageNotifier; + class IClusterStateChangedNotifier; +} + +namespace proton::lidspace { + +/** + * Job that regularly checks whether lid space compaction should be performed + * for the given handler. + * + * Compaction is handled by moving documents from high lids to low free lids. + * A handler is typically working over a single document sub db. + */ +class CompactionJob : public LidSpaceCompactionJobBase +{ +private: + using BucketExecutor = storage::spi::BucketExecutor; + using IDestructorCallback = vespalib::IDestructorCallback; + BucketExecutor &_bucketExecutor; + + bool scanDocuments(const search::LidUsageStats &stats) override; + void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr onDone); + +public: + CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, + ILidSpaceCompactionHandler &handler, + IOperationStorer &opStorer, + BucketExecutor & bucketExecutor, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired); + ~CompactionJob() override; +}; + +} // namespace proton + -- cgit v1.2.3