diff options
author | Geir Storli <geirst@yahoo-inc.com> | 2017-06-27 15:42:38 +0000 |
---|---|---|
committer | Geir Storli <geirst@yahoo-inc.com> | 2017-06-27 15:42:38 +0000 |
commit | 25301438e6924af0420cbf8b124f97f1ce7281d1 (patch) | |
tree | 908ab2d189d863c09a9c2d77a41fee4c16cccf11 /searchcore/src | |
parent | f7906fd2c5c6d47613ed19cb81bc841726a2ebad (diff) |
Move info on whether a maintenance job is blocked to separate interface and impl.
Diffstat (limited to 'searchcore/src')
14 files changed, 190 insertions, 40 deletions
diff --git a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp index 63de9f156ff..0a4b83350ba 100644 --- a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp +++ b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp @@ -2,18 +2,20 @@ #include <vespa/log/log.h> LOG_SETUP("job_tracked_maintenance_test"); +#include <vespa/searchcore/proton/server/i_blockable_maintenance_job.h> #include <vespa/searchcore/proton/server/job_tracked_maintenance_job.h> #include <vespa/searchcore/proton/test/simple_job_tracker.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/closuretask.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/threadstackexecutor.h> using namespace proton; using namespace vespalib; using test::SimpleJobTracker; -typedef std::unique_ptr<Gate> GateUP; -typedef std::vector<GateUP> GateVector; +using GateUP = std::unique_ptr<Gate>; +using GateVector = std::vector<GateUP>; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; GateVector getGateVector(size_t size) @@ -25,18 +27,22 @@ getGateVector(size_t size) return retval; } -struct MyMaintenanceJob : public IMaintenanceJob +struct MyMaintenanceJob : public IBlockableMaintenanceJob { GateVector _runGates; size_t _runIdx; + bool _blocked; MyMaintenanceJob(size_t numRuns) - : IMaintenanceJob("myjob", 10, 20), + : IBlockableMaintenanceJob("myjob", 10, 20), _runGates(getGateVector(numRuns)), - _runIdx(0) + _runIdx(0), + _blocked(false) {} - void block() { - setBlocked(true); - } + void block() { setBlocked(BlockedReason::RESOURCE_LIMITS); } + void unBlock() { unBlock(BlockedReason::RESOURCE_LIMITS); } + virtual void setBlocked(BlockedReason) override { _blocked = true; } + virtual void unBlock(BlockedReason) override { _blocked = false; } + virtual bool isBlocked() const override { return _blocked; } virtual bool run() override { _runGates[_runIdx++]->await(5000); return _runIdx == _runGates.size(); @@ -119,13 +125,14 @@ TEST_F("require that maintenance job that is destroyed is tracked", Fixture(2)) f.assertTracker(0, 0); } -TEST_F("require that block calls are sent to underlying job", Fixture) +TEST_F("require that block calls are sent to underlying jobs", Fixture) { EXPECT_FALSE(f._trackedJob->isBlocked()); + EXPECT_TRUE(f._trackedJob->asBlockable() != nullptr); f._myJob->block(); EXPECT_TRUE(f._myJob->isBlocked()); EXPECT_TRUE(f._trackedJob->isBlocked()); - f._trackedJob->unBlock(); + f._myJob->unBlock(); EXPECT_FALSE(f._myJob->isBlocked()); EXPECT_FALSE(f._trackedJob->isBlocked()); } diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index f2f0029b924..494ae406484 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -20,6 +20,7 @@ using namespace search; using namespace search::index; using namespace vespalib; using storage::spi::Timestamp; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; const uint32_t SUBDB_ID = 2; const double JOB_DELAY = 1.0; @@ -393,7 +394,7 @@ TEST_F("require that job is blocked if trying to move document for frozen bucket EXPECT_TRUE(f._job.isBlocked()); f._frozenHandler._bucket = BUCKET_ID_2; - f._job.unBlock(); + f._job.unBlock(BlockedReason::FROZEN_BUCKET); EXPECT_FALSE(f.run()); // unblocked TEST_DO(f.assertJobContext(2, 9, 1, 0, 0)); diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 4b0c8d4b1b5..222e9e1a6fb 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -8,6 +8,7 @@ #include <vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h> #include <vespa/searchcore/proton/feedoperation/putoperation.h> #include <vespa/searchcore/proton/feedoperation/removeoperation.h> +#include <vespa/searchcore/proton/server/blockable_maintenance_job.h> #include <vespa/searchcore/proton/server/executor_thread_service.h> #include <vespa/searchcore/proton/server/i_operation_storer.h> #include <vespa/searchcore/proton/server/ibucketmodifiedhandler.h> @@ -32,6 +33,7 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/log/log.h> + LOG_SETUP("maintenancecontroller_test"); using namespace proton; @@ -52,6 +54,8 @@ using vespalib::makeClosure; using vespalib::Slime; using proton::matching::ISessionCachePruner; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; + typedef BucketId::List BucketIdVector; typedef std::set<BucketId> BucketIdSet; @@ -277,7 +281,7 @@ public: } }; -struct MySimpleJob : public IMaintenanceJob +struct MySimpleJob : public BlockableMaintenanceJob { vespalib::CountDownLatch _latch; size_t _runCnt; @@ -285,12 +289,12 @@ struct MySimpleJob : public IMaintenanceJob MySimpleJob(double delay, double interval, uint32_t finishCount) - : IMaintenanceJob("my_job", delay, interval), + : BlockableMaintenanceJob("my_job", delay, interval), _latch(finishCount), _runCnt(0) { } - void block() { setBlocked(true); } + void block() { setBlocked(BlockedReason::FROZEN_BUCKET); } virtual bool run() override { LOG(info, "MySimpleJob::run()"); _latch.countDown(); @@ -315,17 +319,17 @@ struct MySplitJob : public MySimpleJob } }; -struct MyLongRunningJob : public IMaintenanceJob +struct MyLongRunningJob : public BlockableMaintenanceJob { vespalib::Gate _firstRun; MyLongRunningJob(double delay, double interval) - : IMaintenanceJob("long_running_job", delay, interval), + : BlockableMaintenanceJob("long_running_job", delay, interval), _firstRun() { } - void block() { setBlocked(true); } + void block() { setBlocked(BlockedReason::FROZEN_BUCKET); } virtual bool run() override { _firstRun.countDown(); usleep(10000); diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index c98bc6be8f4..7208a3695e7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchcore_server STATIC SOURCES + blockable_maintenance_job.cpp bootstrapconfig.cpp bootstrapconfigmanager.cpp buckethandler.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp new file mode 100644 index 00000000000..8cf920c0460 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp @@ -0,0 +1,52 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "blockable_maintenance_job.h" +#include "imaintenancejobrunner.h" + +namespace proton { + +void +BlockableMaintenanceJob::updateBlocked(const LockGuard &) +{ + _blocked = !_blockReasons.empty(); +} + +BlockableMaintenanceJob::BlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval) + : IBlockableMaintenanceJob(name, delay, interval), + _mutex(), + _blockReasons(), + _blocked(false), + _runner(nullptr) +{ +} + +BlockableMaintenanceJob::~BlockableMaintenanceJob() +{ +} + +void +BlockableMaintenanceJob::setBlocked(BlockedReason reason) +{ + LockGuard guard(_mutex); + _blockReasons.insert(reason); + updateBlocked(guard); +} + +void +BlockableMaintenanceJob::unBlock(BlockedReason reason) +{ + bool blocked = false; + { + LockGuard guard(_mutex); + _blockReasons.erase(reason); + updateBlocked(guard); + blocked = _blocked; + } + if (_runner && !blocked) { + _runner->run(); + } +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h new file mode 100644 index 00000000000..a2223b82e94 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h @@ -0,0 +1,48 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_blockable_maintenance_job.h" +#include <mutex> +#include <unordered_set> + +namespace proton { + +class IMaintenanceJobRunner; + +/** + * Implementation of a maintenance job that can be blocked and unblocked due to various external reasons. + * A blocked job is not executed by the IMaintenanceJobRunner wrapping the job. + * When unblocked for a , the job is scheduled for execution again if it is totally unblocked. + */ +class BlockableMaintenanceJob : public IBlockableMaintenanceJob { +private: + using LockGuard = std::lock_guard<std::mutex>; + using ReasonSet = std::unordered_set<BlockedReason>; + + mutable std::mutex _mutex; + ReasonSet _blockReasons; + bool _blocked; + IMaintenanceJobRunner *_runner; + + void updateBlocked(const LockGuard &guard); + +public: + BlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval); + + virtual ~BlockableMaintenanceJob(); + + virtual void setBlocked(BlockedReason reason) override; + + virtual void unBlock(BlockedReason reason) override; + + virtual bool isBlocked() const override { + LockGuard guard(_mutex); + return _blocked; + } + virtual void registerRunner(IMaintenanceJobRunner *runner) override { _runner = runner; } + +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h new file mode 100644 index 00000000000..aa4d5ed79b6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_maintenance_job.h" +#include <string> + +namespace proton { + +/** + * Interface for a maintenance job that can be blocked and unblocked due to various external reasons. + * A blocked job is not executed. When unblocked, the job should be scheduled for execution again. + */ +class IBlockableMaintenanceJob : public IMaintenanceJob { +public: + enum class BlockedReason { + RESOURCE_LIMITS = 0, + FROZEN_BUCKET = 1, + CLUSTER_STATE = 2 + }; + + IBlockableMaintenanceJob(const vespalib::string &name, + double delay, + double interval) + : IMaintenanceJob(name, delay, interval) + {} + + /** + * Block this job due to the given reason. + * Should be called from the same executor thread as the one used in IMaintenanceJobRunner. + */ + virtual void setBlocked(BlockedReason reason) = 0; + + /** + * Unblock this job for the given reason and consider running the job again if not blocked anymore. + * Can be called from any thread. + */ + virtual void unBlock(BlockedReason reason) = 0; + + virtual IBlockableMaintenanceJob *asBlockable() override { return this; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h index 1c83e2022c6..e32fd01df63 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h @@ -5,6 +5,7 @@ namespace proton { +class IBlockableMaintenanceJob; class IMaintenanceJobRunner; /** @@ -17,10 +18,6 @@ private: const vespalib::string _name; const double _delay; const double _interval; - bool _blocked; - -protected: - void setBlocked(bool v) { _blocked = v; } public: typedef std::unique_ptr<IMaintenanceJob> UP; @@ -30,8 +27,7 @@ public: double interval) : _name(name), _delay(delay), - _interval(interval), - _blocked(false) + _interval(interval) {} virtual ~IMaintenanceJob() {} @@ -39,8 +35,8 @@ public: virtual const vespalib::string &getName() const { return _name; } virtual double getDelay() const { return _delay; } virtual double getInterval() const { return _interval; } - virtual bool isBlocked() const { return _blocked; } - virtual void unBlock() { setBlocked(false); } + virtual bool isBlocked() const { return false; } + virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; } /** * Register maintenance job runner, in case event passed to the diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h index 8358d65087f..37b4426ee18 100644 --- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h @@ -23,7 +23,7 @@ public: // Implements IMaintenanceJob virtual bool isBlocked() const override { return _job->isBlocked(); } - virtual void unBlock() override { _job->unBlock(); } + virtual IBlockableMaintenanceJob *asBlockable() override { return _job->asBlockable(); } virtual void registerRunner(IMaintenanceJobRunner *runner) override { _job->registerRunner(runner); } diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index be5225c5acf..99071cb5bf1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -48,7 +48,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(document.bucketId); if ( ! bucketGuard ) { // the job is blocked until the bucket for this document is thawed - setBlocked(true); + setBlocked(BlockedReason::FROZEN_BUCKET); _retryFrozenDocument = true; return true; } else { @@ -88,7 +88,7 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC double resourceLimitFactor, IClusterStateChangedNotifier &clusterStateChangedNotifier, bool nodeRetired) - : IMaintenanceJob("lid_space_compaction." + handler.getName(), + : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(), config.getDelay(), config.getInterval()), _cfg(config), _handler(handler), diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index 32178ac69d8..6afa9d11a98 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -1,10 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "blockable_maintenance_job.h" #include "document_db_maintenance_config.h" #include "i_disk_mem_usage_listener.h" #include "i_lid_space_compaction_handler.h" -#include "i_maintenance_job.h" #include "i_operation_storer.h" #include "ibucketstatecalculator.h" #include "iclusterstatechangedhandler.h" @@ -23,7 +23,7 @@ class IClusterStateChangedNotifier; * Compaction is handled by moving documents from high lids to low free lids. * A handler is typically working over a single document sub db. */ -class LidSpaceCompactionJob : public IMaintenanceJob, +class LidSpaceCompactionJob : public BlockableMaintenanceJob, public IDiskMemUsageListener, public IClusterStateChangedHandler { diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 3d029c26982..79135d27e5c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -3,6 +3,7 @@ #include "maintenancecontroller.h" #include "maintenancejobrunner.h" #include "document_db_maintenance_config.h" +#include "i_blockable_maintenance_job.h" #include <vespa/searchcorespi/index/i_thread_service.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/timer.h> @@ -219,10 +220,9 @@ MaintenanceController::notifyThawedBucket(const BucketId &bucket) (void) bucket; // No need to take _jobsLock as modification of _jobs also happens in master write thread. for (const auto &jw : _jobs) { - IMaintenanceJob &job = jw->getJob(); - if (job.isBlocked()) { - job.unBlock(); - jw->run(); + IBlockableMaintenanceJob *job = jw->getJob().asBlockable(); + if (job && job->isBlocked()) { + job->unBlock(IBlockableMaintenanceJob::BlockedReason::FROZEN_BUCKET); } } } diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp index 637fa3363b6..96952cf9f13 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp @@ -22,7 +22,7 @@ PruneRemovedDocumentsJob(const Config &config, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &handler, IFrozenBucketHandler &frozenHandler) - : IMaintenanceJob("prune_removed_documents." + docTypeName, + : BlockableMaintenanceJob("prune_removed_documents." + docTypeName, config.getDelay(), config.getInterval()), _metaStore(metaStore), _subDbId(subDbId), @@ -85,7 +85,7 @@ PruneRemovedDocumentsJob::run() BucketId bucket(metaData.getBucketId()); IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(bucket); if ( ! bucketGuard ) { - setBlocked(true); + setBlocked(BlockedReason::FROZEN_BUCKET); _nextLid = lid; flush(olid, lid, ageLimit); return true; diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h index 4f97268cd35..e2621bc6118 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h @@ -1,10 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "blockable_maintenance_job.h" #include "document_db_maintenance_config.h" -#include "i_maintenance_job.h" #include <persistence/spi/types.h> -// #include <vespa/searchlib/common/idocumentmetastore.h> namespace proton { @@ -17,7 +16,7 @@ class IFrozenBucketHandler; * Job that regularly checks whether old removed documents should be * forgotten. */ -class PruneRemovedDocumentsJob : public IMaintenanceJob +class PruneRemovedDocumentsJob : public BlockableMaintenanceJob { private: const IDocumentMetaStore &_metaStore; // external ownership |