diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2017-06-21 14:17:33 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-21 14:17:33 +0000 |
commit | c8ec2484db4a81b49ec628e0eb2d091a209c1297 (patch) | |
tree | 51addc27fe3d0c554557afe2d89c22bcfb6761e6 | |
parent | ee5eda836f6850f5f81811a5f6b51dd91f3614fa (diff) | |
parent | 172ec0ad9ae3f49569c8f5151459d77f3a1a4916 (diff) |
Merge pull request #2850 from yahoo/geirst/disable-lid-space-compaction-job-when-node-retired
Geirst/disable lid space compaction job when node retired
5 files changed, 178 insertions, 129 deletions
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/lid_space_compaction/CMakeLists.txt index 593c406f8c4..f4d4164a647 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_lid_space_compaction_test_app TEST SOURCES lid_space_compaction_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_initializer searchcore_feedoperation diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index ea311e00e79..f2f0029b924 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -5,6 +5,8 @@ LOG_SETUP("lid_space_compaction_test"); #include <vespa/searchcore/proton/server/i_lid_space_compaction_handler.h> #include <vespa/searchcore/proton/server/lid_space_compaction_handler.h> #include <vespa/searchcore/proton/server/lid_space_compaction_job.h> +#include <vespa/searchcore/proton/test/clusterstatehandler.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> #include <vespa/searchcore/proton/test/test.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/testkit/testapp.h> @@ -146,36 +148,6 @@ struct MyFrozenBucketHandler : public IFrozenBucketHandler virtual void removeListener(IBucketFreezeListener *) override { } }; -struct MyDiskMemUsageNotifier : public IDiskMemUsageNotifier -{ - DiskMemUsageState _state; - IDiskMemUsageListener *_listener; - MyDiskMemUsageNotifier() - : _state(), - _listener(nullptr) - { - } - ~MyDiskMemUsageNotifier() - { - assert(_listener == nullptr); - } - virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override - { - assert(_listener == nullptr); - _listener = listener; - listener->notifyDiskMemUsage(_state); - } - virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) override - { - assert(listener == _listener); - _listener = nullptr; - } - void update(DiskMemUsageState state) { - _state = state; - _listener->notifyDiskMemUsage(_state); - } -}; - struct MyFeedView : public test::DummyFeedView { MyFeedView(const DocumentTypeRepo::SP &repo) @@ -238,18 +210,21 @@ struct JobFixture MyHandler _handler; MyStorer _storer; MyFrozenBucketHandler _frozenHandler; - MyDiskMemUsageNotifier _diskMemUsageNotifier; + test::DiskMemUsageNotifier _diskMemUsageNotifier; + test::ClusterStateHandler _clusterStateHandler; LidSpaceCompactionJob _job; MyJobRunner _jobRunner; JobFixture(uint32_t allowedLidBloat = ALLOWED_LID_BLOAT, double allowedLidBloatFactor = ALLOWED_LID_BLOAT_FACTOR, uint32_t maxDocsToScan = MAX_DOCS_TO_SCAN, double resourceLimitFactor = RESOURCE_LIMIT_FACTOR, - double interval = JOB_DELAY) + double interval = JOB_DELAY, + bool nodeRetired = false) : _handler(), _job(DocumentDBLidSpaceCompactionConfig(interval, allowedLidBloat, allowedLidBloatFactor, false, maxDocsToScan), - _handler, _storer, _frozenHandler, _diskMemUsageNotifier, resourceLimitFactor), + _handler, _storer, _frozenHandler, _diskMemUsageNotifier, resourceLimitFactor, + _clusterStateHandler, nodeRetired), _jobRunner(_job) { } @@ -291,6 +266,38 @@ struct JobFixture EXPECT_TRUE(run()); return *this; } + void notifyNodeRetired(bool nodeRetired) { + test::BucketStateCalculator::SP calc = std::make_shared<test::BucketStateCalculator>(); + calc->setNodeRetired(nodeRetired); + _clusterStateHandler.notifyClusterStateChanged(calc); + } + void assertJobContext(uint32_t moveToLid, + uint32_t moveFromLid, + uint32_t handleMoveCnt, + uint32_t wantedLidLimit, + uint32_t compactStoreCnt) + { + EXPECT_EQUAL(moveToLid, _handler._moveToLid); + EXPECT_EQUAL(moveFromLid, _handler._moveFromLid); + EXPECT_EQUAL(handleMoveCnt, _handler._handleMoveCnt); + EXPECT_EQUAL(handleMoveCnt, _storer._moveCnt); + EXPECT_EQUAL(wantedLidLimit, _handler._wantedLidLimit); + EXPECT_EQUAL(compactStoreCnt, _storer._compactCnt); + } + void assertNoWorkDone() { + assertJobContext(0, 0, 0, 0, 0); + } + JobFixture &setupOneDocumentToCompact() { + addStats(10, {1,3,4,5,6,9}, + {{9,2}, // 30% bloat: move 9 -> 2 + {6,7}}); // no documents to move + return *this; + } + void assertOneDocumentCompacted() { + TEST_DO(assertJobContext(2, 9, 1, 0, 0)); + TEST_DO(endScan().compact()); + TEST_DO(assertJobContext(2, 9, 1, 7, 1)); + } }; JobFixture::~JobFixture() { @@ -316,23 +323,6 @@ struct HandlerFixture } }; -bool -assertJobContext(uint32_t moveToLid, - uint32_t moveFromLid, - uint32_t handleMoveCnt, - uint32_t wantedLidLimit, - uint32_t compactStoreCnt, - const JobFixture &f) -{ - if (!EXPECT_EQUAL(moveToLid, f._handler._moveToLid)) return false; - if (!EXPECT_EQUAL(moveFromLid, f._handler._moveFromLid)) return false; - if (!EXPECT_EQUAL(handleMoveCnt, f._handler._handleMoveCnt)) return false; - if (!EXPECT_EQUAL(handleMoveCnt, f._storer._moveCnt)) return false; - if (!EXPECT_EQUAL(wantedLidLimit, f._handler._wantedLidLimit)) return false; - if (!EXPECT_EQUAL(compactStoreCnt, f._storer._compactCnt)) return false; - return true; -} - TEST_F("require that handler name is used as part of job name", JobFixture) { EXPECT_EQUAL("lid_space_compaction.myhandler", f._job.getName()); @@ -343,7 +333,7 @@ TEST_F("require that no move operation is created if lid bloat factor is below l // 20% bloat < 30% allowed bloat f.addStats(10, {1,3,4,5,6,7,9}, {{9,2}}); EXPECT_TRUE(f.run()); - EXPECT_TRUE(assertJobContext(0, 0, 0, 0, 0, f)); + TEST_DO(f.assertNoWorkDone()); } TEST("require that no move operation is created if lid bloat is below limit") @@ -352,7 +342,7 @@ TEST("require that no move operation is created if lid bloat is below limit") // 20% bloat >= 10% allowed bloat BUT lid bloat (2) < allowed lid bloat (3) f.addStats(10, {1,3,4,5,6,7,9}, {{9,2}}); EXPECT_TRUE(f.run()); - EXPECT_TRUE(assertJobContext(0, 0, 0, 0, 0, f)); + TEST_DO(f.assertNoWorkDone()); } TEST_F("require that no move operation is created and compaction is initiated", JobFixture) @@ -362,19 +352,14 @@ TEST_F("require that no move operation is created and compaction is initiated", // must scan to find that no documents should be moved f.endScan().compact(); - EXPECT_TRUE(assertJobContext(0, 0, 0, 7, 1, f)); + TEST_DO(f.assertJobContext(0, 0, 0, 7, 1)); } TEST_F("require that 1 move operation is created and compaction is initiated", JobFixture) { - f.addStats(10, {1,3,4,5,6,9}, - {{9,2}, // 30% bloat: move 9 -> 2 - {6,7}}); // no documents to move - + f.setupOneDocumentToCompact(); EXPECT_FALSE(f.run()); // scan - EXPECT_TRUE(assertJobContext(2, 9, 1, 0, 0, f)); - f.endScan().compact(); - EXPECT_TRUE(assertJobContext(2, 9, 1, 7, 1, f)); + TEST_DO(f.assertOneDocumentCompacted()); } TEST_F("require that job returns false when multiple move operations or compaction are needed", @@ -387,13 +372,13 @@ TEST_F("require that job returns false when multiple move operations or compacti {6,7}}); // no documents to move EXPECT_FALSE(f.run()); - EXPECT_TRUE(assertJobContext(2, 9, 1, 0, 0, f)); + TEST_DO(f.assertJobContext(2, 9, 1, 0, 0)); EXPECT_FALSE(f.run()); - EXPECT_TRUE(assertJobContext(3, 8, 2, 0, 0, f)); + TEST_DO(f.assertJobContext(3, 8, 2, 0, 0)); EXPECT_FALSE(f.run()); - EXPECT_TRUE(assertJobContext(4, 7, 3, 0, 0, f)); + TEST_DO(f.assertJobContext(4, 7, 3, 0, 0)); f.endScan().compact(); - EXPECT_TRUE(assertJobContext(4, 7, 3, 7, 1, f)); + TEST_DO(f.assertJobContext(4, 7, 3, 7, 1)); } TEST_F("require that job is blocked if trying to move document for frozen bucket", JobFixture) @@ -404,30 +389,25 @@ TEST_F("require that job is blocked if trying to move document for frozen bucket f.addStats(0, 0, 0, 0); EXPECT_TRUE(f.run()); // bucket frozen - EXPECT_TRUE(assertJobContext(0, 0, 0, 0, 0, f)); + TEST_DO(f.assertNoWorkDone()); EXPECT_TRUE(f._job.isBlocked()); f._frozenHandler._bucket = BUCKET_ID_2; f._job.unBlock(); EXPECT_FALSE(f.run()); // unblocked - EXPECT_TRUE(assertJobContext(2, 9, 1, 0, 0, f)); + TEST_DO(f.assertJobContext(2, 9, 1, 0, 0)); EXPECT_FALSE(f._job.isBlocked()); } TEST_F("require that job handles invalid document meta data when max docs are scanned", JobFixture(ALLOWED_LID_BLOAT, ALLOWED_LID_BLOAT_FACTOR, 3)) { - f.addStats(10, {1,3,4,5,6,9}, - {{9,2}, // 30% bloat: move 9 -> 2 - {6,7}}); // no documents to move - + f.setupOneDocumentToCompact(); EXPECT_FALSE(f.run()); // does not find 9 in first scan - EXPECT_TRUE(assertJobContext(0, 0, 0, 0, 0, f)); + TEST_DO(f.assertNoWorkDone()); EXPECT_FALSE(f.run()); // move 9 -> 2 - EXPECT_TRUE(assertJobContext(2, 9, 1, 0, 0, f)); - f.endScan().compact(); - EXPECT_TRUE(assertJobContext(2, 9, 1, 7, 1, f)); + TEST_DO(f.assertOneDocumentCompacted()); } TEST_F("require that job can restart documents scan if lid bloat is still to large", @@ -443,13 +423,13 @@ TEST_F("require that job can restart documents scan if lid bloat is still to lar // We simulate that the set of used docs have changed between these 2 runs EXPECT_FALSE(f.run()); // move 9 -> 2 f.endScan(); - EXPECT_TRUE(assertJobContext(2, 9, 1, 0, 0, f)); + TEST_DO(f.assertJobContext(2, 9, 1, 0, 0)); EXPECT_EQUAL(2u, f._handler._iteratorCnt); EXPECT_FALSE(f.run()); // does not find 8 in first scan EXPECT_FALSE(f.run()); // move 8 -> 3 - EXPECT_TRUE(assertJobContext(3, 8, 2, 0, 0, f)); + TEST_DO(f.assertJobContext(3, 8, 2, 0, 0)); f.endScan().compact(); - EXPECT_TRUE(assertJobContext(3, 8, 2, 7, 1, f)); + TEST_DO(f.assertJobContext(3, 8, 2, 7, 1)); } TEST_F("require that handler uses doctype and subdb name", HandlerFixture) @@ -481,7 +461,7 @@ TEST_F("require that held lid is not considered free, blocks job", JobFixture) // Lid 1 on hold or pendingHold, i.e. neither free nor used. f.addMultiStats(3, {{2}}, {{2, 3}}); EXPECT_TRUE(f.run()); - EXPECT_TRUE(assertJobContext(0, 0, 0, 0, 0, f)); + TEST_DO(f.assertNoWorkDone()); } TEST_F("require that held lid is not considered free, only compact", JobFixture) @@ -489,9 +469,9 @@ TEST_F("require that held lid is not considered free, only compact", JobFixture) // Lid 1 on hold or pendingHold, i.e. neither free nor used. f.addMultiStats(10, {{2}}, {{2, 3}}); EXPECT_FALSE(f.run()); - EXPECT_TRUE(assertJobContext(0, 0, 0, 0, 0, f)); + TEST_DO(f.assertNoWorkDone()); f.compact(); - EXPECT_TRUE(assertJobContext(0, 0, 0, 3, 1, f)); + TEST_DO(f.assertJobContext(0, 0, 0, 3, 1)); } TEST_F("require that held lids are not considered free, one move", JobFixture) @@ -499,45 +479,35 @@ TEST_F("require that held lids are not considered free, one move", JobFixture) // Lids 1,2,3 on hold or pendingHold, i.e. neither free nor used. f.addMultiStats(10, {{5}}, {{5, 4}, {4, 5}}); EXPECT_FALSE(f.run()); - EXPECT_TRUE(assertJobContext(4, 5, 1, 0, 0, f)); + TEST_DO(f.assertJobContext(4, 5, 1, 0, 0)); f.endScan().compact(); - EXPECT_TRUE(assertJobContext(4, 5, 1, 5, 1, f)); + TEST_DO(f.assertJobContext(4, 5, 1, 5, 1)); } TEST_F("require that resource starvation blocks lid space compaction", JobFixture) { - f.addStats(10, {1,3,4,5,6,9}, - {{9,2}, // 30% bloat: move 9 -> 2 - {6,7}}); // no documents to move - TEST_DO(f._diskMemUsageNotifier.update({{100, 0}, {100, 101}})); + f.setupOneDocumentToCompact(); + TEST_DO(f._diskMemUsageNotifier.notify({{100, 0}, {100, 101}})); EXPECT_TRUE(f.run()); // scan - TEST_DO(assertJobContext(0, 0, 0, 0, 0, f)); + TEST_DO(f.assertNoWorkDone()); } TEST_F("require that ending resource starvation resumes lid space compaction", JobFixture) { - f.addStats(10, {1,3,4,5,6,9}, - {{9,2}, // 30% bloat: move 9 -> 2 - {6,7}}); // no documents to move - TEST_DO(f._diskMemUsageNotifier.update({{100, 0}, {100, 101}})); + f.setupOneDocumentToCompact(); + TEST_DO(f._diskMemUsageNotifier.notify({{100, 0}, {100, 101}})); EXPECT_TRUE(f.run()); // scan - TEST_DO(assertJobContext(0, 0, 0, 0, 0, f)); - TEST_DO(f._diskMemUsageNotifier.update({{100, 0}, {100, 0}})); - TEST_DO(assertJobContext(2, 9, 1, 0, 0, f)); - TEST_DO(f.endScan().compact()); - TEST_DO(assertJobContext(2, 9, 1, 7, 1, f)); + TEST_DO(f.assertNoWorkDone()); + TEST_DO(f._diskMemUsageNotifier.notify({{100, 0}, {100, 0}})); + TEST_DO(f.assertOneDocumentCompacted()); } TEST_F("require that resource limit factor adjusts limit", JobFixture(ALLOWED_LID_BLOAT, ALLOWED_LID_BLOAT_FACTOR, MAX_DOCS_TO_SCAN, 1.05)) { - f.addStats(10, {1,3,4,5,6,9}, - {{9,2}, // 30% bloat: move 9 -> 2 - {6,7}}); // no documents to move - TEST_DO(f._diskMemUsageNotifier.update({{100, 0}, {100, 101}})); + f.setupOneDocumentToCompact(); + TEST_DO(f._diskMemUsageNotifier.notify({{100, 0}, {100, 101}})); EXPECT_FALSE(f.run()); // scan - TEST_DO(assertJobContext(2, 9, 1, 0, 0, f)); - TEST_DO(f.endScan().compact()); - TEST_DO(assertJobContext(2, 9, 1, 7, 1, f)); + TEST_DO(f.assertOneDocumentCompacted()); } struct JobFixtureWithInterval : public JobFixture { @@ -558,6 +528,36 @@ TEST_F("require that delay is set based on interval and can be less than 300 sec EXPECT_EQUAL(299, f._job.getInterval()); } +struct JobFixtureWithNodeRetired : public JobFixture { + JobFixtureWithNodeRetired(bool nodeRetired) + : JobFixture(ALLOWED_LID_BLOAT, ALLOWED_LID_BLOAT_FACTOR, MAX_DOCS_TO_SCAN, RESOURCE_LIMIT_FACTOR, JOB_DELAY, nodeRetired) + {} +}; + +TEST_F("require that job is disabled when node is retired", JobFixtureWithNodeRetired(true)) +{ + f.setupOneDocumentToCompact(); + EXPECT_TRUE(f.run()); // not runnable, no work to do + TEST_DO(f.assertNoWorkDone()); +} + +TEST_F("require that job is disabled when node becomes retired", JobFixtureWithNodeRetired(false)) +{ + f.setupOneDocumentToCompact(); + f.notifyNodeRetired(true); + EXPECT_TRUE(f.run()); // not runnable, no work to do + TEST_DO(f.assertNoWorkDone()); +} + +TEST_F("require that job is re-enabled when node is no longer retired", JobFixtureWithNodeRetired(true)) +{ + f.setupOneDocumentToCompact(); + EXPECT_TRUE(f.run()); // not runnable, no work to do + TEST_DO(f.assertNoWorkDone()); + f.notifyNodeRetired(false); // triggers running of job + TEST_DO(f.assertOneDocumentCompacted()); +} + TEST_MAIN() { TEST_RUN_ALL(); diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index 8ec337af2e4..be5225c5acf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -1,10 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "lid_space_compaction_job.h" -#include "imaintenancejobrunner.h" #include "i_disk_mem_usage_notifier.h" +#include "iclusterstatechangednotifier.h" +#include "imaintenancejobrunner.h" +#include "lid_space_compaction_job.h" +#include <vespa/log/log.h> #include <vespa/searchcore/proton/common/eventlogger.h> #include <cassert> +LOG_SETUP(".proton.server.lid_space_compaction_job"); using search::DocumentMetaData; using search::LidUsageStats; @@ -82,7 +85,9 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC IOperationStorer &opStorer, IFrozenBucketHandler &frozenHandler, IDiskMemUsageNotifier &diskMemUsageNotifier, - double resourceLimitFactor) + double resourceLimitFactor, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired) : IMaintenanceJob("lid_space_compaction." + handler.getName(), config.getDelay(), config.getInterval()), _cfg(config), @@ -93,16 +98,21 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC _retryFrozenDocument(false), _shouldCompactLidSpace(false), _resourcesOK(true), + _nodeRetired(nodeRetired), _runnable(true), _runner(nullptr), _diskMemUsageNotifier(diskMemUsageNotifier), - _resourceLimitFactor(resourceLimitFactor) + _resourceLimitFactor(resourceLimitFactor), + _clusterStateChangedNotifier(clusterStateChangedNotifier) { _diskMemUsageNotifier.addDiskMemUsageListener(this); + _clusterStateChangedNotifier.addClusterStateChangedHandler(this); + refreshRunnable(); } LidSpaceCompactionJob::~LidSpaceCompactionJob() { + _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); _diskMemUsageNotifier.removeDiskMemUsageListener(this); } @@ -128,15 +138,12 @@ LidSpaceCompactionJob::run() void LidSpaceCompactionJob::refreshRunnable() { - _runnable = _resourcesOK; + _runnable = (_resourcesOK && !_nodeRetired); } void -LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state) +LidSpaceCompactionJob::refreshAndConsiderRunnable() { - // Called by master write thread - bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor); - _resourcesOK = resourcesOK; bool oldRunnable = _runnable; refreshRunnable(); if (_runner && _runnable && !oldRunnable) { @@ -145,6 +152,30 @@ LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state) } void +LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor); + _resourcesOK = resourcesOK; + refreshAndConsiderRunnable(); +} + +void +LidSpaceCompactionJob::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) +{ + // Called by master write thread + bool nodeRetired = newCalc->nodeRetired(); + if (nodeRetired && !_nodeRetired) { + LOG(info, "notifyClusterStateChanged(): Node is retired -> lid space compaction job disabled"); + } + if (!nodeRetired && _nodeRetired) { + LOG(info, "notifyClusterStateChanged(): Node is no longer retired -> lid space compaction job re-enabled"); + } + _nodeRetired = nodeRetired; + refreshAndConsiderRunnable(); +} + +void LidSpaceCompactionJob::registerRunner(IMaintenanceJobRunner *runner) { _runner = runner; diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index b5e8e4567bc..32178ac69d8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -2,15 +2,19 @@ #pragma once #include "document_db_maintenance_config.h" +#include "i_disk_mem_usage_listener.h" #include "i_lid_space_compaction_handler.h" #include "i_maintenance_job.h" #include "i_operation_storer.h" -#include "i_disk_mem_usage_listener.h" +#include "ibucketstatecalculator.h" +#include "iclusterstatechangedhandler.h" +#include "iclusterstatechangednotifier.h" namespace proton { class IFrozenBucketHandler; class IDiskMemUsageNotifier; +class IClusterStateChangedNotifier; /** * Job that regularly checks whether lid space compaction should be performed @@ -20,21 +24,24 @@ class IDiskMemUsageNotifier; * A handler is typically working over a single document sub db. */ class LidSpaceCompactionJob : public IMaintenanceJob, - public IDiskMemUsageListener + public IDiskMemUsageListener, + public IClusterStateChangedHandler { private: const DocumentDBLidSpaceCompactionConfig _cfg; - ILidSpaceCompactionHandler &_handler; - IOperationStorer &_opStorer; - IFrozenBucketHandler &_frozenHandler; - IDocumentScanIterator::UP _scanItr; - bool _retryFrozenDocument; - bool _shouldCompactLidSpace; - bool _resourcesOK; - bool _runnable; // can try to perform work - IMaintenanceJobRunner *_runner; - IDiskMemUsageNotifier &_diskMemUsageNotifier; - double _resourceLimitFactor; + ILidSpaceCompactionHandler &_handler; + IOperationStorer &_opStorer; + IFrozenBucketHandler &_frozenHandler; + IDocumentScanIterator::UP _scanItr; + bool _retryFrozenDocument; + bool _shouldCompactLidSpace; + bool _resourcesOK; + bool _nodeRetired; + bool _runnable; // can try to perform work + IMaintenanceJobRunner *_runner; + IDiskMemUsageNotifier &_diskMemUsageNotifier; + double _resourceLimitFactor; + IClusterStateChangedNotifier &_clusterStateChangedNotifier; bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; @@ -42,6 +49,7 @@ private: bool scanDocuments(const search::LidUsageStats &stats); void compactLidSpace(const search::LidUsageStats &stats); void refreshRunnable(); + void refreshAndConsiderRunnable(); public: LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionConfig &config, @@ -49,12 +57,17 @@ public: IOperationStorer &opStorer, IFrozenBucketHandler &frozenHandler, IDiskMemUsageNotifier &diskMemUsageNotifier, - double resourceLimitFactor); + double resourceLimitFactor, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + bool nodeRetired); ~LidSpaceCompactionJob(); // Implements IDiskMemUsageListener virtual void notifyDiskMemUsage(DiskMemUsageState state) override; + // Implements IClusterStateChangedNofifier + virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + // Implements IMaintenanceJob virtual bool run() override; virtual void registerRunner(IMaintenanceJobRunner *runner) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index 0a9eaca789b..5bebaa847f6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -31,14 +31,18 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller, IOperationStorer &opStorer, IFrozenBucketHandler &fbHandler, const IJobTracker::SP &tracker, - IDiskMemUsageNotifier &diskMemUsageNotifier) + IDiskMemUsageNotifier &diskMemUsageNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + const std::shared_ptr<IBucketStateCalculator> &calc) { for (auto &lidHandler : lscHandlers) { IMaintenanceJob::UP job = IMaintenanceJob::UP (new LidSpaceCompactionJob(config.getLidSpaceCompactionConfig(), *lidHandler, opStorer, fbHandler, diskMemUsageNotifier, - config.getResourceLimitFactor())); + config.getResourceLimitFactor(), + clusterStateChangedNotifier, + (calc ? calc->nodeRetired() : false))); controller.registerJobInMasterThread(std::move(trackJob(tracker, std::move(job)))); } @@ -110,7 +114,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, if (!config.getLidSpaceCompactionConfig().isDisabled()) { injectLidSpaceCompactionJobs(controller, config, lscHandlers, opStorer, fbHandler, jobTrackers.getLidSpaceCompact(), - diskMemUsageNotifier); + diskMemUsageNotifier, clusterStateChangedNotifier, calc); } injectBucketMoveJob(controller, fbHandler, docTypeName, moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, calc, jobTrackers, diskMemUsageNotifier, config.getResourceLimitFactor()); |