diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-05-04 12:37:50 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-05-04 17:55:22 +0000 |
commit | 37444bd2ca6ab9e6245f42d3c0efebee00ee5210 (patch) | |
tree | f4834e3a8c0b9e79546fe4905c476c88f749d748 /searchcore | |
parent | 90bfeda99d269486682fd5a82f73d99ced6aedd1 (diff) |
Collapse LidSpaceCompactionJobBase into CompactionJob.
Diffstat (limited to 'searchcore')
5 files changed, 192 insertions, 262 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 78e04c066f3..75d7dc0cb59 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -57,7 +57,6 @@ vespa_add_library(searchcore_server STATIC ireplayconfig.cpp job_tracked_maintenance_job.cpp lid_space_compaction_handler.cpp - lid_space_compaction_job_base.cpp lid_space_compaction_job_take2.cpp maintenance_controller_explorer.cpp maintenance_jobs_injector.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp deleted file mode 100644 index 225d128f9bf..00000000000 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "lid_space_compaction_job_base.h" -#include "imaintenancejobrunner.h" -#include "i_document_scan_iterator.h" -#include "i_lid_space_compaction_handler.h" -#include "i_operation_storer.h" -#include "remove_operations_rate_tracker.h" -#include "i_disk_mem_usage_notifier.h" -#include "iclusterstatechangednotifier.h" -#include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h> -#include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/vespalib/util/destructor_callbacks.h> -#include <vespa/vespalib/util/gate.h> -#include <cassert> - -#include <vespa/log/log.h> -LOG_SETUP(".proton.server.lid_space_compaction_job"); - -using search::DocumentMetaData; -using search::LidUsageStats; - -namespace proton { - -bool -LidSpaceCompactionJobBase::hasTooMuchLidBloat(const LidUsageStats &stats) const -{ - return ((stats.getLidBloat() >= _cfg.getAllowedLidBloat()) && - (stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor()) && - (stats.getLidLimit() > stats.getLowestFreeLid())); -} - -bool -LidSpaceCompactionJobBase::shouldRestartScanDocuments(const LidUsageStats &stats) const -{ - return ((stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid()) && - (stats.getLowestFreeLid() < stats.getHighestUsedLid()); -} - -DocumentMetaData -LidSpaceCompactionJobBase::getNextDocument(const LidUsageStats &stats, bool retryLastDocument) -{ - return _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), retryLastDocument); -} - -void -LidSpaceCompactionJobBase::compactLidSpace(const LidUsageStats &stats) -{ - uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; - CompactLidSpaceOperation op(_handler->getSubDbId(), wantedLidLimit); - vespalib::Gate gate; - auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared<vespalib::GateCallback>(gate)); - gate.await(); - _handler->handleCompactLidSpace(op, std::make_shared<vespalib::KeepAlive<decltype(commit_result)>>(std::move(commit_result))); - EventLogger::lidSpaceCompactionComplete(_handler->getName(), wantedLidLimit); - _shouldCompactLidSpace = false; -} - -bool -LidSpaceCompactionJobBase::remove_batch_is_ongoing() const -{ - return _ops_rate_tracker->remove_batch_above_threshold(); -} - -bool -LidSpaceCompactionJobBase::remove_is_ongoing() const -{ - return _ops_rate_tracker->remove_above_threshold(); -} - -LidSpaceCompactionJobBase::LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config, - std::shared_ptr<ILidSpaceCompactionHandler> handler, - IOperationStorer &opStorer, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - bool nodeRetired) - : BlockableMaintenanceJob("lid_space_compaction." + handler->getName(), - config.getDelay(), config.getInterval(), blockableConfig), - _cfg(config), - _handler(std::move(handler)), - _opStorer(opStorer), - _scanItr(), - _diskMemUsageNotifier(diskMemUsageNotifier), - _clusterStateChangedNotifier(clusterStateChangedNotifier), - _ops_rate_tracker(std::make_shared<RemoveOperationsRateTracker>(config.get_remove_batch_block_rate(), - config.get_remove_block_rate())), - _is_disabled(false), - _shouldCompactLidSpace(false) -{ - _diskMemUsageNotifier.addDiskMemUsageListener(this); - _clusterStateChangedNotifier.addClusterStateChangedHandler(this); - if (nodeRetired) { - setBlocked(BlockedReason::CLUSTER_STATE); - } - _handler->set_operation_listener(_ops_rate_tracker); -} - -LidSpaceCompactionJobBase::~LidSpaceCompactionJobBase() -{ - _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); - _diskMemUsageNotifier.removeDiskMemUsageListener(this); -} - -bool -LidSpaceCompactionJobBase::run() -{ - if (isBlocked()) { - return true; // indicate work is done since no work can be done - } - LidUsageStats stats = _handler->getLidStatus(); - if (remove_batch_is_ongoing()) { - // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. - LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing", - _handler->getName().c_str()); - _is_disabled = true; - return true; - } - if (remove_is_ongoing()) { - // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. - LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing", - _handler->getName().c_str()); - _is_disabled = true; - return true; - } - if (_is_disabled) { - LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing", - _handler->getName().c_str()); - _is_disabled = false; - } - - if (_scanItr && !_scanItr->valid()) { - if (shouldRestartScanDocuments(_handler->getLidStatus())) { - _scanItr = _handler->getIterator(); - } else { - _scanItr = IDocumentScanIterator::UP(); - _shouldCompactLidSpace = true; - return false; - } - } - - if (_scanItr) { - return scanDocuments(stats); - } else if (_shouldCompactLidSpace) { - compactLidSpace(stats); - } else if (hasTooMuchLidBloat(stats)) { - assert(!_scanItr); - _scanItr = _handler->getIterator(); - return scanDocuments(stats); - } - return true; -} - -void -LidSpaceCompactionJobBase::notifyDiskMemUsage(DiskMemUsageState state) -{ - // Called by master write thread - internalNotifyDiskMemUsage(state); -} - -void -LidSpaceCompactionJobBase::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) -{ - // Called by master write thread - bool nodeRetired = newCalc->nodeRetired(); - if (!nodeRetired) { - if (isBlocked(BlockedReason::CLUSTER_STATE)) { - LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler->getName().c_str()); - unBlock(BlockedReason::CLUSTER_STATE); - } - } else { - LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler->getName().c_str()); - setBlocked(BlockedReason::CLUSTER_STATE); - } -} - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h deleted file mode 100644 index d127025c496..00000000000 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "blockable_maintenance_job.h" -#include "document_db_maintenance_config.h" -#include "i_disk_mem_usage_listener.h" -#include "iclusterstatechangedhandler.h" -#include <vespa/searchlib/common/idocumentmetastore.h> - -namespace proton { - class IDiskMemUsageNotifier; - class IClusterStateChangedNotifier; - struct IOperationStorer; - struct ILidSpaceCompactionHandler; - struct IDocumentScanIterator; - class RemoveOperationsRateTracker; -} - -namespace proton { - -/** - * This is a base class for moving documents from a high lid to a lower free - * lid in order to keep the lid space compact. - * - * Compaction is handled by moving documents from high lids to low free lids. - * A handler is typically working over a single document sub db. - */ -class LidSpaceCompactionJobBase : public BlockableMaintenanceJob, - public IDiskMemUsageListener, - public IClusterStateChangedHandler -{ -private: - const DocumentDBLidSpaceCompactionConfig _cfg; -protected: - std::shared_ptr<ILidSpaceCompactionHandler> _handler; - IOperationStorer &_opStorer; - std::unique_ptr<IDocumentScanIterator> _scanItr; -private: - IDiskMemUsageNotifier &_diskMemUsageNotifier; - IClusterStateChangedNotifier &_clusterStateChangedNotifier; - std::shared_ptr<RemoveOperationsRateTracker> _ops_rate_tracker; - bool _is_disabled; - bool _shouldCompactLidSpace; - - - bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; - bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; - virtual bool scanDocuments(const search::LidUsageStats &stats) = 0; - void compactLidSpace(const search::LidUsageStats &stats); - bool remove_batch_is_ongoing() const; - bool remove_is_ongoing() const; -protected: - search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument); -public: - LidSpaceCompactionJobBase(const DocumentDBLidSpaceCompactionConfig &config, - std::shared_ptr<ILidSpaceCompactionHandler> handler, - IOperationStorer &opStorer, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - bool nodeRetired); - ~LidSpaceCompactionJobBase() override; - - void notifyDiskMemUsage(DiskMemUsageState state) override; - void notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) override; - bool run() override; -}; - -} // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index 6ac8f803800..f2333f2f2b7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -4,15 +4,24 @@ #include "i_document_scan_iterator.h" #include "i_lid_space_compaction_handler.h" #include "i_operation_storer.h" +#include "i_disk_mem_usage_notifier.h" +#include "iclusterstatechangednotifier.h" +#include "remove_operations_rate_tracker.h" +#include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h> #include <vespa/searchcore/proton/feedoperation/moveoperation.h> +#include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/searchcorespi/index/i_thread_service.h> #include <vespa/persistence/spi/bucket_tasks.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/gate.h> #include <cassert> #include <thread> +#include <vespa/log/log.h> +LOG_SETUP(".proton.server.lidspace.compactionjob"); + using search::DocumentMetaData; using search::LidUsageStats; using storage::spi::makeBucketTask; @@ -124,17 +133,39 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, IClusterStateChangedNotifier &clusterStateChangedNotifier, bool nodeRetired, document::BucketSpace bucketSpace) - : LidSpaceCompactionJobBase(config, std::move(handler), opStorer, diskMemUsageNotifier, - blockableConfig, clusterStateChangedNotifier, nodeRetired), + : BlockableMaintenanceJob("lid_space_compaction." + handler->getName(), + config.getDelay(), config.getInterval(), blockableConfig), + IDiskMemUsageListener(), + IClusterStateChangedHandler(), std::enable_shared_from_this<CompactionJob>(), + _cfg(config), + _handler(std::move(handler)), + _opStorer(opStorer), + _scanItr(), + _diskMemUsageNotifier(diskMemUsageNotifier), + _clusterStateChangedNotifier(clusterStateChangedNotifier), + _ops_rate_tracker(std::make_shared<RemoveOperationsRateTracker>(config.get_remove_batch_block_rate(), + config.get_remove_block_rate())), + _is_disabled(false), + _shouldCompactLidSpace(false), _master(master), _bucketExecutor(bucketExecutor), _dbRetainer(std::move(dbRetainer)), _bucketSpace(bucketSpace), _stopped(false) -{ } +{ + _diskMemUsageNotifier.addDiskMemUsageListener(this); + _clusterStateChangedNotifier.addClusterStateChangedHandler(this); + if (nodeRetired) { + setBlocked(BlockedReason::CLUSTER_STATE); + } + _handler->set_operation_listener(_ops_rate_tracker); +} -CompactionJob::~CompactionJob() = default; +CompactionJob::~CompactionJob() { + _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); + _diskMemUsageNotifier.removeDiskMemUsageListener(this); +} std::shared_ptr<CompactionJob> CompactionJob::create(const DocumentDBLidSpaceCompactionConfig &config, @@ -164,4 +195,122 @@ CompactionJob::onStop() { _stopped = true; } +DocumentMetaData +CompactionJob::getNextDocument(const LidUsageStats &stats, bool retryLastDocument) +{ + return _scanItr->next(std::max(stats.getLowestFreeLid(), stats.getUsedLids()), retryLastDocument); +} + +bool +CompactionJob::run() +{ + if (isBlocked()) { + return true; // indicate work is done since no work can be done + } + LidUsageStats stats = _handler->getLidStatus(); + if (remove_batch_is_ongoing()) { + // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. + LOG(info, "%s: Lid space compaction is disabled while remove batch (delete buckets) is ongoing", + _handler->getName().c_str()); + _is_disabled = true; + return true; + } + if (remove_is_ongoing()) { + // Note that we don't set the job as blocked as the decision to un-block it is not driven externally. + LOG(info, "%s: Lid space compaction is disabled while remove operations are ongoing", + _handler->getName().c_str()); + _is_disabled = true; + return true; + } + if (_is_disabled) { + LOG(info, "%s: Lid space compaction is re-enabled as remove operations are no longer ongoing", + _handler->getName().c_str()); + _is_disabled = false; + } + + if (_scanItr && !_scanItr->valid()) { + if (shouldRestartScanDocuments(_handler->getLidStatus())) { + _scanItr = _handler->getIterator(); + } else { + _scanItr = IDocumentScanIterator::UP(); + _shouldCompactLidSpace = true; + return false; + } + } + + if (_scanItr) { + return scanDocuments(stats); + } else if (_shouldCompactLidSpace) { + compactLidSpace(stats); + } else if (hasTooMuchLidBloat(stats)) { + assert(!_scanItr); + _scanItr = _handler->getIterator(); + return scanDocuments(stats); + } + return true; +} + +bool +CompactionJob::remove_batch_is_ongoing() const +{ + return _ops_rate_tracker->remove_batch_above_threshold(); +} + +bool +CompactionJob::remove_is_ongoing() const +{ + return _ops_rate_tracker->remove_above_threshold(); +} + +bool +CompactionJob::hasTooMuchLidBloat(const LidUsageStats &stats) const +{ + return ((stats.getLidBloat() >= _cfg.getAllowedLidBloat()) && + (stats.getLidBloatFactor() >= _cfg.getAllowedLidBloatFactor()) && + (stats.getLidLimit() > stats.getLowestFreeLid())); +} + +bool +CompactionJob::shouldRestartScanDocuments(const LidUsageStats &stats) const +{ + return ((stats.getUsedLids() + _cfg.getAllowedLidBloat()) < stats.getHighestUsedLid()) && + (stats.getLowestFreeLid() < stats.getHighestUsedLid()); +} + +void +CompactionJob::compactLidSpace(const LidUsageStats &stats) +{ + uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; + CompactLidSpaceOperation op(_handler->getSubDbId(), wantedLidLimit); + vespalib::Gate gate; + auto commit_result = _opStorer.appendAndCommitOperation(op, std::make_shared<vespalib::GateCallback>(gate)); + gate.await(); + _handler->handleCompactLidSpace(op, std::make_shared<vespalib::KeepAlive<decltype(commit_result)>>(std::move(commit_result))); + EventLogger::lidSpaceCompactionComplete(_handler->getName(), wantedLidLimit); + _shouldCompactLidSpace = false; +} + +void +CompactionJob::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + internalNotifyDiskMemUsage(state); +} + +void +CompactionJob::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) +{ + // Called by master write thread + bool nodeRetired = newCalc->nodeRetired(); + if (!nodeRetired) { + if (isBlocked(BlockedReason::CLUSTER_STATE)) { + LOG(info, "%s: Lid space compaction is un-blocked as node is no longer retired", _handler->getName().c_str()); + unBlock(BlockedReason::CLUSTER_STATE); + } + } else { + LOG(info, "%s: Lid space compaction is blocked as node is retired", _handler->getName().c_str()); + setBlocked(BlockedReason::CLUSTER_STATE); + } +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h index aa72d2e84bc..141f29f4510 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -2,7 +2,11 @@ #pragma once -#include "lid_space_compaction_job_base.h" +#include "blockable_maintenance_job.h" +#include "document_db_maintenance_config.h" +#include "i_disk_mem_usage_listener.h" +#include "iclusterstatechangedhandler.h" +#include <vespa/searchlib/common/idocumentmetastore.h> #include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/document/bucket/bucketspace.h> #include <atomic> @@ -11,9 +15,13 @@ namespace storage::spi { struct BucketExecutor; } namespace searchcorespi::index { struct IThreadService; } namespace vespalib { class IDestructorCallback; } namespace proton { + class MoveOperation; class IDiskMemUsageNotifier; class IClusterStateChangedNotifier; - class MoveOperation; + struct IOperationStorer; + struct ILidSpaceCompactionHandler; + struct IDocumentScanIterator; + class RemoveOperationsRateTracker; } namespace proton::lidspace { @@ -22,19 +30,38 @@ namespace proton::lidspace { * Moves documents from higher lids to lower lids. It uses a BucketExecutor that ensures that the bucket * is locked for changes while the document is moved. */ -class CompactionJob : public LidSpaceCompactionJobBase, public std::enable_shared_from_this<CompactionJob> +class CompactionJob : public BlockableMaintenanceJob, + public IDiskMemUsageListener, + public IClusterStateChangedHandler, + public std::enable_shared_from_this<CompactionJob> { private: using BucketExecutor = storage::spi::BucketExecutor; using IDestructorCallback = vespalib::IDestructorCallback; using IThreadService = searchcorespi::index::IThreadService; - IThreadService &_master; - BucketExecutor &_bucketExecutor; - RetainGuard _dbRetainer; - document::BucketSpace _bucketSpace; - std::atomic<bool> _stopped; + const DocumentDBLidSpaceCompactionConfig _cfg; + std::shared_ptr<ILidSpaceCompactionHandler> _handler; + IOperationStorer &_opStorer; + std::unique_ptr<IDocumentScanIterator> _scanItr; + IDiskMemUsageNotifier &_diskMemUsageNotifier; + IClusterStateChangedNotifier &_clusterStateChangedNotifier; + std::shared_ptr<RemoveOperationsRateTracker> _ops_rate_tracker; + bool _is_disabled; + bool _shouldCompactLidSpace; + IThreadService &_master; + BucketExecutor &_bucketExecutor; + RetainGuard _dbRetainer; + document::BucketSpace _bucketSpace; + std::atomic<bool> _stopped; + + bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; + bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; + void compactLidSpace(const search::LidUsageStats &stats); + bool remove_batch_is_ongoing() const; + bool remove_is_ongoing() const; + search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument); - bool scanDocuments(const search::LidUsageStats &stats) override; + bool scanDocuments(const search::LidUsageStats &stats); static void moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> onDone); void completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr<MoveOperation> moveOp, @@ -67,6 +94,9 @@ public: bool nodeRetired, document::BucketSpace bucketSpace); ~CompactionJob() override; + void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) override; + bool run() override; }; } // namespace proton |