diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-06-28 16:03:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-28 16:03:53 +0200 |
commit | 2242acb1197103febb9524d4eb3e9f66e8f7458f (patch) | |
tree | fd655639b0cdbd132ffccb547065f4d585bcf089 /searchcore | |
parent | 6a1a7705ca63ef26e87f66b9c6da41e68477a564 (diff) | |
parent | 7e9fd7da532597e719ceaeecb3a8c3e5edd808d9 (diff) |
Merge pull request #2898 from yahoo/geirst/consolidate-handling-of-blockable-maintenance-jobs
Geirst/consolidate handling of blockable maintenance jobs
Diffstat (limited to 'searchcore')
18 files changed, 316 insertions, 194 deletions
diff --git a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp index 63de9f156ff..0a4b83350ba 100644 --- a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp +++ b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp @@ -2,18 +2,20 @@ #include <vespa/log/log.h> LOG_SETUP("job_tracked_maintenance_test"); +#include <vespa/searchcore/proton/server/i_blockable_maintenance_job.h> #include <vespa/searchcore/proton/server/job_tracked_maintenance_job.h> #include <vespa/searchcore/proton/test/simple_job_tracker.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/closuretask.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/threadstackexecutor.h> using namespace proton; using namespace vespalib; using test::SimpleJobTracker; -typedef std::unique_ptr<Gate> GateUP; -typedef std::vector<GateUP> GateVector; +using GateUP = std::unique_ptr<Gate>; +using GateVector = std::vector<GateUP>; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; GateVector getGateVector(size_t size) @@ -25,18 +27,22 @@ getGateVector(size_t size) return retval; } -struct MyMaintenanceJob : public IMaintenanceJob +struct MyMaintenanceJob : public IBlockableMaintenanceJob { GateVector _runGates; size_t _runIdx; + bool _blocked; MyMaintenanceJob(size_t numRuns) - : IMaintenanceJob("myjob", 10, 20), + : IBlockableMaintenanceJob("myjob", 10, 20), _runGates(getGateVector(numRuns)), - _runIdx(0) + _runIdx(0), + _blocked(false) {} - void block() { - setBlocked(true); - } + void block() { setBlocked(BlockedReason::RESOURCE_LIMITS); } + void unBlock() { unBlock(BlockedReason::RESOURCE_LIMITS); } + virtual void setBlocked(BlockedReason) override { _blocked = true; } + virtual void unBlock(BlockedReason) override { _blocked = false; } + virtual bool isBlocked() const override { return _blocked; } virtual bool run() override { _runGates[_runIdx++]->await(5000); return _runIdx == _runGates.size(); @@ -119,13 +125,14 @@ TEST_F("require that maintenance job that is destroyed is tracked", Fixture(2)) f.assertTracker(0, 0); } -TEST_F("require that block calls are sent to underlying job", Fixture) +TEST_F("require that block calls are sent to underlying jobs", Fixture) { EXPECT_FALSE(f._trackedJob->isBlocked()); + EXPECT_TRUE(f._trackedJob->asBlockable() != nullptr); f._myJob->block(); EXPECT_TRUE(f._myJob->isBlocked()); EXPECT_TRUE(f._trackedJob->isBlocked()); - f._trackedJob->unBlock(); + f._myJob->unBlock(); EXPECT_FALSE(f._myJob->isBlocked()); EXPECT_FALSE(f._trackedJob->isBlocked()); } diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index f2f0029b924..494ae406484 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -20,6 +20,7 @@ using namespace search; using namespace search::index; using namespace vespalib; using storage::spi::Timestamp; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; const uint32_t SUBDB_ID = 2; const double JOB_DELAY = 1.0; @@ -393,7 +394,7 @@ TEST_F("require that job is blocked if trying to move document for frozen bucket EXPECT_TRUE(f._job.isBlocked()); f._frozenHandler._bucket = BUCKET_ID_2; - f._job.unBlock(); + f._job.unBlock(BlockedReason::FROZEN_BUCKET); EXPECT_FALSE(f.run()); // unblocked TEST_DO(f.assertJobContext(2, 9, 1, 0, 0)); diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 4b0c8d4b1b5..222e9e1a6fb 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -8,6 +8,7 @@ #include <vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h> #include <vespa/searchcore/proton/feedoperation/putoperation.h> #include <vespa/searchcore/proton/feedoperation/removeoperation.h> +#include <vespa/searchcore/proton/server/blockable_maintenance_job.h> #include <vespa/searchcore/proton/server/executor_thread_service.h> #include <vespa/searchcore/proton/server/i_operation_storer.h> #include <vespa/searchcore/proton/server/ibucketmodifiedhandler.h> @@ -32,6 +33,7 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/log/log.h> + LOG_SETUP("maintenancecontroller_test"); using namespace proton; @@ -52,6 +54,8 @@ using vespalib::makeClosure; using vespalib::Slime; using proton::matching::ISessionCachePruner; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; + typedef BucketId::List BucketIdVector; typedef std::set<BucketId> BucketIdSet; @@ -277,7 +281,7 @@ public: } }; -struct MySimpleJob : public IMaintenanceJob +struct MySimpleJob : public BlockableMaintenanceJob { vespalib::CountDownLatch _latch; size_t _runCnt; @@ -285,12 +289,12 @@ struct MySimpleJob : public IMaintenanceJob MySimpleJob(double delay, double interval, uint32_t finishCount) - : IMaintenanceJob("my_job", delay, interval), + : BlockableMaintenanceJob("my_job", delay, interval), _latch(finishCount), _runCnt(0) { } - void block() { setBlocked(true); } + void block() { setBlocked(BlockedReason::FROZEN_BUCKET); } virtual bool run() override { LOG(info, "MySimpleJob::run()"); _latch.countDown(); @@ -315,17 +319,17 @@ struct MySplitJob : public MySimpleJob } }; -struct MyLongRunningJob : public IMaintenanceJob +struct MyLongRunningJob : public BlockableMaintenanceJob { vespalib::Gate _firstRun; MyLongRunningJob(double delay, double interval) - : IMaintenanceJob("long_running_job", delay, interval), + : BlockableMaintenanceJob("long_running_job", delay, interval), _firstRun() { } - void block() { setBlocked(true); } + void block() { setBlocked(BlockedReason::FROZEN_BUCKET); } virtual bool run() override { _firstRun.countDown(); usleep(10000); diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index c98bc6be8f4..7208a3695e7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchcore_server STATIC SOURCES + blockable_maintenance_job.cpp bootstrapconfig.cpp bootstrapconfigmanager.cpp buckethandler.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp new file mode 100644 index 00000000000..b7dfd8196c8 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp @@ -0,0 +1,97 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "blockable_maintenance_job.h" +#include "disk_mem_usage_state.h" +#include "imaintenancejobrunner.h" + +namespace proton { + +void +BlockableMaintenanceJob::updateBlocked(const LockGuard &) +{ + _blocked = !_blockReasons.empty(); +} + +void +BlockableMaintenanceJob::internalNotifyDiskMemUsage(const DiskMemUsageState &state) +{ + bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor); + if (resourcesOK) { + if (isBlocked(BlockedReason::RESOURCE_LIMITS)) { + unBlock(BlockedReason::RESOURCE_LIMITS); + } + } else { + setBlocked(BlockedReason::RESOURCE_LIMITS); + } +} + +BlockableMaintenanceJob::BlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval) + : BlockableMaintenanceJob(name, delay, interval, 1.0) +{ +} + +BlockableMaintenanceJob::BlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval, + double resourceLimitFactor) + : IBlockableMaintenanceJob(name, delay, interval), + _mutex(), + _blockReasons(), + _blocked(false), + _runner(nullptr), + _resourceLimitFactor(resourceLimitFactor) +{ +} + +BlockableMaintenanceJob::~BlockableMaintenanceJob() +{ +} + +bool +BlockableMaintenanceJob::isBlocked(BlockedReason reason) +{ + LockGuard guard(_mutex); + return (_blockReasons.find(reason) != _blockReasons.end()); +} + +void +BlockableMaintenanceJob::considerRun() +{ + if (_runner && !isBlocked()) { + _runner->run(); + } +} + +void +BlockableMaintenanceJob::setBlocked(BlockedReason reason) +{ + LockGuard guard(_mutex); + _blockReasons.insert(reason); + updateBlocked(guard); +} + +void +BlockableMaintenanceJob::unBlock(BlockedReason reason) +{ + bool considerRun = false; + { + LockGuard guard(_mutex); + _blockReasons.erase(reason); + updateBlocked(guard); + considerRun = !_blocked; + } + if (_runner && considerRun) { + _runner->run(); + } +} + +bool +BlockableMaintenanceJob::isBlocked() const +{ + LockGuard guard(_mutex); + return _blocked; +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h new file mode 100644 index 00000000000..a2f84ebe8fc --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h @@ -0,0 +1,56 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_blockable_maintenance_job.h" +#include <mutex> +#include <unordered_set> + +namespace proton { + +class DiskMemUsageState; +class IMaintenanceJobRunner; + +/** + * Implementation of a maintenance job that can be blocked and unblocked due to various external reasons. + * A blocked job is not executed by the IMaintenanceJobRunner wrapping the job. + * When unblocked for a , the job is scheduled for execution again if it is totally unblocked. + */ +class BlockableMaintenanceJob : public IBlockableMaintenanceJob { +private: + using LockGuard = std::lock_guard<std::mutex>; + using ReasonSet = std::unordered_set<BlockedReason>; + + mutable std::mutex _mutex; + ReasonSet _blockReasons; + bool _blocked; + IMaintenanceJobRunner *_runner; + double _resourceLimitFactor; + + void updateBlocked(const LockGuard &guard); + +protected: + void internalNotifyDiskMemUsage(const DiskMemUsageState &state); + +public: + BlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval); + + BlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval, + double resourceLimitFactor); + + virtual ~BlockableMaintenanceJob(); + + bool isBlocked(BlockedReason reason); + void considerRun(); + + virtual void setBlocked(BlockedReason reason) override; + virtual void unBlock(BlockedReason reason) override; + virtual bool isBlocked() const override; + virtual void registerRunner(IMaintenanceJobRunner *runner) override { _runner = runner; } + +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index cb2932e3066..4972c33fdd5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -25,7 +25,6 @@ const char * bool2str(bool v) { return (v ? "T" : "F"); } } - BucketMoveJob::ScanIterator:: ScanIterator(BucketDBOwner::Guard db, uint32_t pass, BucketId lastBucket, BucketId endBucket) : _db(std::move(db)), @@ -35,7 +34,6 @@ ScanIterator(BucketDBOwner::Guard db, uint32_t pass, BucketId lastBucket, Bucket { } - BucketMoveJob::ScanIterator:: ScanIterator(BucketDBOwner::Guard db, BucketId bucket) : _db(std::move(db)), @@ -44,7 +42,6 @@ ScanIterator(BucketDBOwner::Guard db, BucketId bucket) { } - BucketMoveJob::ScanIterator::ScanIterator(ScanIterator &&rhs) : _db(std::move(rhs._db)), _itr(rhs._itr), @@ -95,7 +92,6 @@ BucketMoveJob::checkBucket(const BucketId &bucket, _moveHandler, _ready._metaStore->getBucketDB()); } - BucketMoveJob::ScanResult BucketMoveJob::scanBuckets(size_t maxBucketsToScan, IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard) { @@ -119,7 +115,6 @@ BucketMoveJob::scanBuckets(size_t maxBucketsToScan, IFrozenBucketHandler::Exclus return ScanResult(bucketsScanned, passDone); } - void BucketMoveJob::moveDocuments(DocumentBucketMover &mover, size_t maxDocsToMove, @@ -139,6 +134,18 @@ BucketMoveJob::moveDocuments(DocumentBucketMover &mover, } } +namespace { + +bool +blockedDueToClusterState(const IBucketStateCalculator::SP &calc) +{ + bool clusterUp = calc.get() != nullptr && calc->clusterUp(); + bool nodeUp = calc.get() != nullptr && calc->nodeUp(); + bool nodeInitializing = calc.get() != nullptr && calc->nodeInitializing(); + return !(clusterUp && nodeUp && !nodeInitializing); +} + +} BucketMoveJob:: BucketMoveJob(const IBucketStateCalculator::SP &calc, @@ -152,7 +159,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, IDiskMemUsageNotifier &diskMemUsageNotifier, double resourceLimitFactor, const vespalib::string &docTypeName) - : IMaintenanceJob("move_buckets." + docTypeName, 0.0, 0.0), + : BlockableMaintenanceJob("move_buckets." + docTypeName, 0.0, 0.0, resourceLimitFactor), IClusterStateChangedHandler(), IBucketFreezeListener(), _calc(calc), @@ -169,26 +176,20 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _delayedBucketsFrozen(), _frozenBuckets(frozenBuckets), _delayedMover(), - _runner(nullptr), - _clusterUp(false), - _nodeUp(false), - _nodeInitializing(false), - _resourcesOK(false), - _runnable(false), _clusterStateChangedNotifier(clusterStateChangedNotifier), _bucketStateChangedNotifier(bucketStateChangedNotifier), - _diskMemUsageNotifier(diskMemUsageNotifier), - _resourceLimitFactor(resourceLimitFactor) + _diskMemUsageNotifier(diskMemUsageNotifier) { - refreshDerivedClusterState(); - + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } + _frozenBuckets.addListener(this); _clusterStateChangedNotifier.addClusterStateChangedHandler(this); _bucketStateChangedNotifier.addBucketStateChangedHandler(this); _diskMemUsageNotifier.addDiskMemUsageListener(this); } - BucketMoveJob::~BucketMoveJob() { _frozenBuckets.removeListener(this); @@ -197,21 +198,19 @@ BucketMoveJob::~BucketMoveJob() _diskMemUsageNotifier.removeDiskMemUsageListener(this); } - void BucketMoveJob::maybeCancelMover(DocumentBucketMover &mover) { // Cancel bucket if moving in wrong direction if (!mover.bucketDone()) { bool ready = mover.getSource() == &_ready; - if (!_runnable || + if (isBlocked() || _calc->shouldBeReady(mover.getBucket()) == ready) { mover.cancel(); } } } - void BucketMoveJob::maybeDelayMover(DocumentBucketMover &mover, BucketId bucket) { @@ -228,20 +227,16 @@ BucketMoveJob::notifyThawedBucket(const BucketId &bucket) { if (_delayedBucketsFrozen.erase(bucket) != 0u) { _delayedBuckets.insert(bucket); - if (_runner && _runnable) { - _runner->run(); - } + considerRun(); } } - void BucketMoveJob::deactivateBucket(BucketId bucket) { _delayedBuckets.insert(bucket); } - void BucketMoveJob::activateBucket(BucketId bucket) { @@ -268,7 +263,6 @@ BucketMoveJob::changedCalculator() maybeCancelMover(_delayedMover); } - void BucketMoveJob::scanAndMove(size_t maxBucketsToScan, size_t maxDocsToMove) @@ -318,47 +312,26 @@ BucketMoveJob::scanAndMove(size_t maxBucketsToScan, } } -void -BucketMoveJob::registerRunner(IMaintenanceJobRunner *runner) -{ - _runner = runner; -} - - bool BucketMoveJob::run() { - if (!_runnable) + if (isBlocked()) { return true; // indicate work is done, since node state is bad + } scanAndMove(200, 1); return done(); } void -BucketMoveJob::refreshRunnable() -{ - _runnable = _clusterUp && _nodeUp && !_nodeInitializing && _resourcesOK; -} - -void -BucketMoveJob::refreshDerivedClusterState() -{ - _clusterUp = _calc.get() != NULL && _calc->clusterUp(); - _nodeUp = _calc.get() != NULL && _calc->nodeUp(); - _nodeInitializing = _calc.get() != NULL && _calc->nodeInitializing(); - refreshRunnable(); -} - -void -BucketMoveJob::notifyClusterStateChanged(const IBucketStateCalculator::SP & - newCalc) +BucketMoveJob::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) { // Called by master write thread _calc = newCalc; - refreshDerivedClusterState(); changedCalculator(); - if (_runner && _runnable) { - _runner->run(); + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } else { + unBlock(BlockedReason::CLUSTER_STATE); } } @@ -372,20 +345,16 @@ BucketMoveJob::notifyBucketStateChanged(const BucketId &bucketId, } else { activateBucket(bucketId); } - if (!done() && _runner && _runnable) { - _runner->run(); + if (!done()) { + considerRun(); } } -void BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state) +void +BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state) { // Called by master write thread - bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor); - _resourcesOK = resourcesOK; - refreshRunnable(); - if (_runner && _runnable) { - _runner->run(); - } + internalNotifyDiskMemUsage(state); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index 16e3ad5ad93..ffbcf68f769 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -2,15 +2,15 @@ #pragma once -#include "ibucketstatecalculator.h" -#include "ibucketmodifiedhandler.h" -#include "ifrozenbuckethandler.h" +#include "blockable_maintenance_job.h" #include "documentbucketmover.h" -#include "i_maintenance_job.h" -#include "iclusterstatechangedhandler.h" +#include "i_disk_mem_usage_listener.h" #include "ibucketfreezelistener.h" +#include "ibucketmodifiedhandler.h" +#include "ibucketstatecalculator.h" #include "ibucketstatechangedhandler.h" -#include "i_disk_mem_usage_listener.h" +#include "iclusterstatechangedhandler.h" +#include "ifrozenbuckethandler.h" #include <vespa/searchcore/proton/bucketdb/bucket_db_owner.h> #include <set> @@ -24,9 +24,9 @@ class IDiskMemUsageNotifier; /** * Class used to control the moving of buckets between the ready and - * not ready sub databases. + * not ready sub databases based on the readiness of buckets according to the cluster state. */ -class BucketMoveJob : public IMaintenanceJob, +class BucketMoveJob : public BlockableMaintenanceJob, public IClusterStateChangedHandler, public IBucketFreezeListener, public IBucketStateChangedHandler, @@ -97,16 +97,9 @@ private: DelayedBucketSet _delayedBucketsFrozen; IFrozenBucketHandler &_frozenBuckets; DocumentBucketMover _delayedMover; - IMaintenanceJobRunner *_runner; - bool _clusterUp; - bool _nodeUp; - bool _nodeInitializing; - bool _resourcesOK; - bool _runnable; // can try to perform work IClusterStateChangedNotifier &_clusterStateChangedNotifier; IBucketStateChangedNotifier &_bucketStateChangedNotifier; IDiskMemUsageNotifier &_diskMemUsageNotifier; - double _resourceLimitFactor; ScanResult scanBuckets(size_t maxBucketsToScan, @@ -126,9 +119,6 @@ private: DocumentBucketMover &mover, IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard); - void refreshRunnable(); - void refreshDerivedClusterState(); - /** * Signal that the given bucket should be de-activated. * An active bucket is not considered for moving from ready to not ready sub database. @@ -169,25 +159,17 @@ public: } // IMaintenanceJob API - virtual void registerRunner(IMaintenanceJobRunner *runner) override; - - // IMaintenanceJob API virtual bool run() override; // IClusterStateChangedHandler API virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; - // IBucketFreezeListener API - /** - * Signal that the given bucket has been thawed. - * A thawed bucket can be considered for moving. - */ virtual void notifyThawedBucket(const document::BucketId &bucket) override; // IBucketStateChangedHandler API - void notifyBucketStateChanged(const document::BucketId &bucketId, - storage::spi::BucketInfo::ActiveState newState) override; + virtual void notifyBucketStateChanged(const document::BucketId &bucketId, + storage::spi::BucketInfo::ActiveState newState) override; virtual void notifyDiskMemUsage(DiskMemUsageState state) override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp index d6353784fd7..3b3437b4346 100644 --- a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp @@ -25,32 +25,27 @@ class ClusterStateAdapter : public IBucketStateCalculator { private: ClusterState _calc; + bool _clusterUp; + bool _nodeUp; + bool _nodeInitializing; + bool _nodeRetired; public: ClusterStateAdapter(const ClusterState &calc) - : _calc(calc) + : _calc(calc), + _clusterUp(_calc.clusterUp()), + _nodeUp(_calc.nodeUp()), + _nodeInitializing(_calc.nodeInitializing()), + _nodeRetired(_calc.nodeRetired()) { } - bool shouldBeReady(const document::BucketId &bucket) const override { return _calc.shouldBeReady(Bucket(bucket, PartitionId(0))); } - - bool clusterUp() const override { - return _calc.clusterUp(); - } - - bool nodeUp() const override { - return _calc.nodeUp(); - } - - bool nodeInitializing() const override { - return _calc.nodeInitializing(); - } - - bool nodeRetired() const override { - return _calc.nodeRetired(); - } + bool clusterUp() const override { return _clusterUp; } + bool nodeUp() const override { return _nodeUp; } + bool nodeInitializing() const override { return _nodeInitializing; } + bool nodeRetired() const override { return _nodeRetired; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h new file mode 100644 index 00000000000..aa4d5ed79b6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_maintenance_job.h" +#include <string> + +namespace proton { + +/** + * Interface for a maintenance job that can be blocked and unblocked due to various external reasons. + * A blocked job is not executed. When unblocked, the job should be scheduled for execution again. + */ +class IBlockableMaintenanceJob : public IMaintenanceJob { +public: + enum class BlockedReason { + RESOURCE_LIMITS = 0, + FROZEN_BUCKET = 1, + CLUSTER_STATE = 2 + }; + + IBlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval) + : IMaintenanceJob(name, delay, interval) + {} + + /** + * Block this job due to the given reason. + * Should be called from the same executor thread as the one used in IMaintenanceJobRunner. + */ + virtual void setBlocked(BlockedReason reason) = 0; + + /** + * Unblock this job for the given reason and consider running the job again if not blocked anymore. + * Can be called from any thread. + */ + virtual void unBlock(BlockedReason reason) = 0; + + virtual IBlockableMaintenanceJob *asBlockable() override { return this; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h index 1c83e2022c6..e32fd01df63 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h @@ -5,6 +5,7 @@ namespace proton { +class IBlockableMaintenanceJob; class IMaintenanceJobRunner; /** @@ -17,10 +18,6 @@ private: const vespalib::string _name; const double _delay; const double _interval; - bool _blocked; - -protected: - void setBlocked(bool v) { _blocked = v; } public: typedef std::unique_ptr<IMaintenanceJob> UP; @@ -30,8 +27,7 @@ public: double interval) : _name(name), _delay(delay), - _interval(interval), - _blocked(false) + _interval(interval) {} virtual ~IMaintenanceJob() {} @@ -39,8 +35,8 @@ public: virtual const vespalib::string &getName() const { return _name; } virtual double getDelay() const { return _delay; } virtual double getInterval() const { return _interval; } - virtual bool isBlocked() const { return _blocked; } - virtual void unBlock() { setBlocked(false); } + virtual bool isBlocked() const { return false; } + virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; } /** * Register maintenance job runner, in case event passed to the diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h b/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h index b6c2898d5e4..719e7746326 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h +++ b/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h @@ -14,6 +14,11 @@ class IBucketFreezeListener { public: virtual ~IBucketFreezeListener() {} + + /** + * Signal that the given bucket has been thawed. + * A thawed bucket can be considered for moving. + */ virtual void notifyThawedBucket(const document::BucketId &bucket) = 0; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h index 8358d65087f..37b4426ee18 100644 --- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h @@ -23,7 +23,7 @@ public: // Implements IMaintenanceJob virtual bool isBlocked() const override { return _job->isBlocked(); } - virtual void unBlock() override { _job->unBlock(); } + virtual IBlockableMaintenanceJob *asBlockable() override { return _job->asBlockable(); } virtual void registerRunner(IMaintenanceJobRunner *runner) override { _job->registerRunner(runner); } diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index be5225c5acf..3cddfcc6510 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -48,7 +48,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(document.bucketId); if ( ! bucketGuard ) { // the job is blocked until the bucket for this document is thawed - setBlocked(true); + setBlocked(BlockedReason::FROZEN_BUCKET); _retryFrozenDocument = true; return true; } else { @@ -88,8 +88,8 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC double resourceLimitFactor, IClusterStateChangedNotifier &clusterStateChangedNotifier, bool nodeRetired) - : IMaintenanceJob("lid_space_compaction." + handler.getName(), - config.getDelay(), config.getInterval()), + : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(), + config.getDelay(), config.getInterval(), resourceLimitFactor), _cfg(config), _handler(handler), _opStorer(opStorer), @@ -97,17 +97,14 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC _scanItr(), _retryFrozenDocument(false), _shouldCompactLidSpace(false), - _resourcesOK(true), - _nodeRetired(nodeRetired), - _runnable(true), - _runner(nullptr), _diskMemUsageNotifier(diskMemUsageNotifier), - _resourceLimitFactor(resourceLimitFactor), _clusterStateChangedNotifier(clusterStateChangedNotifier) { _diskMemUsageNotifier.addDiskMemUsageListener(this); _clusterStateChangedNotifier.addClusterStateChangedHandler(this); - refreshRunnable(); + if (nodeRetired) { + setBlocked(BlockedReason::CLUSTER_STATE); + } } LidSpaceCompactionJob::~LidSpaceCompactionJob() @@ -119,7 +116,7 @@ LidSpaceCompactionJob::~LidSpaceCompactionJob() bool LidSpaceCompactionJob::run() { - if (!_runnable) { + if (isBlocked()) { return true; // indicate work is done since no work can be done } LidUsageStats stats = _handler.getLidStatus(); @@ -136,28 +133,10 @@ LidSpaceCompactionJob::run() } void -LidSpaceCompactionJob::refreshRunnable() -{ - _runnable = (_resourcesOK && !_nodeRetired); -} - -void -LidSpaceCompactionJob::refreshAndConsiderRunnable() -{ - bool oldRunnable = _runnable; - refreshRunnable(); - if (_runner && _runnable && !oldRunnable) { - _runner->run(); - } -} - -void LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state) { // Called by master write thread - bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor); - _resourcesOK = resourcesOK; - refreshAndConsiderRunnable(); + internalNotifyDiskMemUsage(state); } void @@ -165,20 +144,15 @@ LidSpaceCompactionJob::notifyClusterStateChanged(const IBucketStateCalculator::S { // Called by master write thread bool nodeRetired = newCalc->nodeRetired(); - if (nodeRetired && !_nodeRetired) { + if (!nodeRetired) { + if (isBlocked(BlockedReason::CLUSTER_STATE)) { + LOG(info, "notifyClusterStateChanged(): Node is no longer retired -> lid space compaction job re-enabled"); + unBlock(BlockedReason::CLUSTER_STATE); + } + } else { LOG(info, "notifyClusterStateChanged(): Node is retired -> lid space compaction job disabled"); + setBlocked(BlockedReason::CLUSTER_STATE); } - if (!nodeRetired && _nodeRetired) { - LOG(info, "notifyClusterStateChanged(): Node is no longer retired -> lid space compaction job re-enabled"); - } - _nodeRetired = nodeRetired; - refreshAndConsiderRunnable(); -} - -void -LidSpaceCompactionJob::registerRunner(IMaintenanceJobRunner *runner) -{ - _runner = runner; } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index 32178ac69d8..f260671344c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -1,10 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "blockable_maintenance_job.h" #include "document_db_maintenance_config.h" #include "i_disk_mem_usage_listener.h" #include "i_lid_space_compaction_handler.h" -#include "i_maintenance_job.h" #include "i_operation_storer.h" #include "ibucketstatecalculator.h" #include "iclusterstatechangedhandler.h" @@ -23,7 +23,7 @@ class IClusterStateChangedNotifier; * Compaction is handled by moving documents from high lids to low free lids. * A handler is typically working over a single document sub db. */ -class LidSpaceCompactionJob : public IMaintenanceJob, +class LidSpaceCompactionJob : public BlockableMaintenanceJob, public IDiskMemUsageListener, public IClusterStateChangedHandler { @@ -35,12 +35,7 @@ private: IDocumentScanIterator::UP _scanItr; bool _retryFrozenDocument; bool _shouldCompactLidSpace; - bool _resourcesOK; - bool _nodeRetired; - bool _runnable; // can try to perform work - IMaintenanceJobRunner *_runner; IDiskMemUsageNotifier &_diskMemUsageNotifier; - double _resourceLimitFactor; IClusterStateChangedNotifier &_clusterStateChangedNotifier; bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; @@ -70,7 +65,6 @@ public: // Implements IMaintenanceJob virtual bool run() override; - virtual void registerRunner(IMaintenanceJobRunner *runner) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 3d029c26982..79135d27e5c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -3,6 +3,7 @@ #include "maintenancecontroller.h" #include "maintenancejobrunner.h" #include "document_db_maintenance_config.h" +#include "i_blockable_maintenance_job.h" #include <vespa/searchcorespi/index/i_thread_service.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/timer.h> @@ -219,10 +220,9 @@ MaintenanceController::notifyThawedBucket(const BucketId &bucket) (void) bucket; // No need to take _jobsLock as modification of _jobs also happens in master write thread. for (const auto &jw : _jobs) { - IMaintenanceJob &job = jw->getJob(); - if (job.isBlocked()) { - job.unBlock(); - jw->run(); + IBlockableMaintenanceJob *job = jw->getJob().asBlockable(); + if (job && job->isBlocked()) { + job->unBlock(IBlockableMaintenanceJob::BlockedReason::FROZEN_BUCKET); } } } diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp index 637fa3363b6..96952cf9f13 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp @@ -22,7 +22,7 @@ PruneRemovedDocumentsJob(const Config &config, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &handler, IFrozenBucketHandler &frozenHandler) - : IMaintenanceJob("prune_removed_documents." + docTypeName, + : BlockableMaintenanceJob("prune_removed_documents." + docTypeName, config.getDelay(), config.getInterval()), _metaStore(metaStore), _subDbId(subDbId), @@ -85,7 +85,7 @@ PruneRemovedDocumentsJob::run() BucketId bucket(metaData.getBucketId()); IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(bucket); if ( ! bucketGuard ) { - setBlocked(true); + setBlocked(BlockedReason::FROZEN_BUCKET); _nextLid = lid; flush(olid, lid, ageLimit); return true; diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h index 4f97268cd35..e2621bc6118 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h @@ -1,10 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "blockable_maintenance_job.h" #include "document_db_maintenance_config.h" -#include "i_maintenance_job.h" #include <persistence/spi/types.h> -// #include <vespa/searchlib/common/idocumentmetastore.h> namespace proton { @@ -17,7 +16,7 @@ class IFrozenBucketHandler; * Job that regularly checks whether old removed documents should be * forgotten. */ -class PruneRemovedDocumentsJob : public IMaintenanceJob +class PruneRemovedDocumentsJob : public BlockableMaintenanceJob { private: const IDocumentMetaStore &_metaStore; // external ownership |