diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-04-18 16:25:26 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-04-18 16:25:26 +0200 |
commit | 78946c348e78dc845e61201a12b6f2fc74e8b4d7 (patch) | |
tree | c73402c6d9f3bc088136ac8b08ebc35b1c6a9752 /searchcore | |
parent | a2fd961fa41c2fc620d661e9a1ad44595110c91c (diff) |
Use a generic pool (summary pool) for running generic manitenance task and add testing for correct setup.
Diffstat (limited to 'searchcore')
9 files changed, 87 insertions, 83 deletions
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index d8565130ff6..d2564c80f11 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -85,7 +85,7 @@ public: std::shared_ptr<BucketDBOwner> bucketDB, const DocTypeName &docTypeName); ~MyDocumentSubDB(); - uint32_t getSubDBId(void) const { return _subDBId; } + uint32_t getSubDBId() const { return _subDBId; } Document::UP getDocument(DocumentIdT lid) const @@ -98,14 +98,14 @@ public: } } - MaintenanceDocumentSubDB getSubDB(void); + MaintenanceDocumentSubDB getSubDB(); void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &op); void handlePut(PutOperation &op); void handleRemove(RemoveOperation &op); void prepareMove(MoveOperation &op); void handleMove(const MoveOperation &op); - uint32_t getNumUsedLids(void) const; - uint32_t getDocumentCount(void) const { return _docs.size(); } + uint32_t getNumUsedLids() const; + uint32_t getDocumentCount() const { return _docs.size(); } void setBucketState(const BucketId &bucket, bool active) { _metaStore.setBucketState(bucket, active); @@ -139,7 +139,7 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest } virtual const document::DocumentTypeRepo & - getDocumentTypeRepo(void) const override + getDocumentTypeRepo() const override { abort(); } @@ -210,7 +210,7 @@ public: ~MyFeedHandler(); bool - isExecutorThread(void); + isExecutorThread(); virtual void handleMove(MoveOperation &op) override; @@ -219,7 +219,7 @@ public: performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op) override; virtual void - heartBeat(void) override; + heartBeat() override; virtual void wipeOldRemovedFields(TimeStamp wipeTimeLimit) override; @@ -228,7 +228,7 @@ public: setSubDBs(const std::vector<MyDocumentSubDB *> &subDBs); SerialNum - incSerialNum(void) + incSerialNum() { return ++_serialNum; } @@ -238,7 +238,7 @@ public: storeOperation(FeedOperation &op) override; uint32_t - getHeartBeats(void) + getHeartBeats() { return _heartBeats; } @@ -256,16 +256,12 @@ class MyExecutor: public vespalib::ThreadStackExecutor public: FastOS_ThreadId _threadId; - MyExecutor(void); + MyExecutor(); - virtual - ~MyExecutor(void); - - bool - isIdle(void); + ~MyExecutor(); - bool - waitIdle(double timeout); + bool isIdle(); + bool waitIdle(double timeout); }; @@ -284,7 +280,7 @@ public: _freezer.freezeBucket(_bucketId); } - ~MyFrozenBucket(void) + ~MyFrozenBucket() { _freezer.thawBucket(_bucketId); } @@ -447,13 +443,13 @@ public: test::DiskMemUsageNotifier _diskMemUsageNotifier; MaintenanceController _mc; - MaintenanceControllerFixture(void); + MaintenanceControllerFixture(); virtual - ~MaintenanceControllerFixture(void); + ~MaintenanceControllerFixture(); void - syncSubDBs(void); + syncSubDBs(); void commit() override { } @@ -462,30 +458,30 @@ public: } void - performSyncSubDBs(void); + performSyncSubDBs(); void - notifyClusterStateChanged(void); + notifyClusterStateChanged(); void - performNotifyClusterStateChanged(void); + performNotifyClusterStateChanged(); void - startMaintenance(void); + startMaintenance(); void injectMaintenanceJobs(); void - performStartMaintenance(void); + performStartMaintenance(); void - stopMaintenance(void); + stopMaintenance(); void - forwardMaintenanceConfig(void); + forwardMaintenanceConfig(); void - performForwardMaintenanceConfig(void); + performForwardMaintenanceConfig(); void insertDocs(const test::UserDocuments &docs, @@ -605,7 +601,7 @@ public: MaintenanceDocumentSubDB -MyDocumentSubDB::getSubDB(void) +MyDocumentSubDB::getSubDB() { IDocumentRetriever::SP retriever(new MyDocumentRetriever(*this)); @@ -780,7 +776,7 @@ MyDocumentSubDB::handleMove(const MoveOperation &op) uint32_t -MyDocumentSubDB::getNumUsedLids(void) const +MyDocumentSubDB::getNumUsedLids() const { return _metaStore.getNumUsedLids(); } @@ -799,13 +795,13 @@ MyFeedHandler::MyFeedHandler(FastOS_ThreadId &executorThreadId) } -MyFeedHandler::~MyFeedHandler(void) +MyFeedHandler::~MyFeedHandler() { } bool -MyFeedHandler::isExecutorThread(void) +MyFeedHandler::isExecutorThread() { FastOS_ThreadId threadId(FastOS_Thread::GetCurrentThreadId()); return FastOS_Thread::CompareThreadIds(_executorThreadId, threadId); @@ -844,7 +840,7 @@ MyFeedHandler::performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op) void -MyFeedHandler::heartBeat(void) +MyFeedHandler::heartBeat() { assert(isExecutorThread()); ++_heartBeats; @@ -873,7 +869,7 @@ MyFeedHandler::storeOperation(FeedOperation &op) } -MyExecutor::MyExecutor(void) +MyExecutor::MyExecutor() : vespalib::ThreadStackExecutor(1, 128 * 1024), _threadId() { @@ -882,13 +878,13 @@ MyExecutor::MyExecutor(void) } -MyExecutor::~MyExecutor(void) +MyExecutor::~MyExecutor() { } bool -MyExecutor::isIdle(void) +MyExecutor::isIdle() { (void) getStats(); sync(); @@ -912,7 +908,7 @@ MyExecutor::waitIdle(double timeout) } -MaintenanceControllerFixture::MaintenanceControllerFixture(void) +MaintenanceControllerFixture::MaintenanceControllerFixture() : _executor(), _threadService(_executor), _docTypeName("searchdocument"), // must match document builder @@ -936,7 +932,7 @@ MaintenanceControllerFixture::MaintenanceControllerFixture(void) _readyAttributeManager(std::make_shared<MyAttributeManager>()), _notReadyAttributeManager(std::make_shared<MyAttributeManager>()), _attributeUsageFilter(), - _mc(_threadService, _docTypeName) + _mc(_threadService, _executor, _docTypeName) { std::vector<MyDocumentSubDB *> subDBs; subDBs.push_back(&_ready); @@ -947,14 +943,14 @@ MaintenanceControllerFixture::MaintenanceControllerFixture(void) } -MaintenanceControllerFixture::~MaintenanceControllerFixture(void) +MaintenanceControllerFixture::~MaintenanceControllerFixture() { stopMaintenance(); } void -MaintenanceControllerFixture::syncSubDBs(void) +MaintenanceControllerFixture::syncSubDBs() { _executor.execute(makeTask(makeClosure(this, &MaintenanceControllerFixture:: @@ -964,7 +960,7 @@ MaintenanceControllerFixture::syncSubDBs(void) void -MaintenanceControllerFixture::performSyncSubDBs(void) +MaintenanceControllerFixture::performSyncSubDBs() { _mc.syncSubDBs(_ready.getSubDB(), _removed.getSubDB(), @@ -973,7 +969,7 @@ MaintenanceControllerFixture::performSyncSubDBs(void) void -MaintenanceControllerFixture::notifyClusterStateChanged(void) +MaintenanceControllerFixture::notifyClusterStateChanged() { _executor.execute(makeTask(makeClosure(this, &MaintenanceControllerFixture:: @@ -983,14 +979,14 @@ MaintenanceControllerFixture::notifyClusterStateChanged(void) void -MaintenanceControllerFixture::performNotifyClusterStateChanged(void) +MaintenanceControllerFixture::performNotifyClusterStateChanged() { _clusterStateHandler.notifyClusterStateChanged(_calc); } void -MaintenanceControllerFixture::startMaintenance(void) +MaintenanceControllerFixture::startMaintenance() { _executor.execute(makeTask(makeClosure(this, &MaintenanceControllerFixture:: @@ -1015,7 +1011,7 @@ MaintenanceControllerFixture::injectMaintenanceJobs() } void -MaintenanceControllerFixture::performStartMaintenance(void) +MaintenanceControllerFixture::performStartMaintenance() { injectMaintenanceJobs(); _mc.start(_mcCfg); @@ -1023,7 +1019,7 @@ MaintenanceControllerFixture::performStartMaintenance(void) void -MaintenanceControllerFixture::stopMaintenance(void) +MaintenanceControllerFixture::stopMaintenance() { _mc.stop(); _executor.sync(); @@ -1031,7 +1027,7 @@ MaintenanceControllerFixture::stopMaintenance(void) void -MaintenanceControllerFixture::forwardMaintenanceConfig(void) +MaintenanceControllerFixture::forwardMaintenanceConfig() { _executor.execute(makeTask(makeClosure(this, &MaintenanceControllerFixture:: @@ -1041,7 +1037,7 @@ MaintenanceControllerFixture::forwardMaintenanceConfig(void) void -MaintenanceControllerFixture::performForwardMaintenanceConfig(void) +MaintenanceControllerFixture::performForwardMaintenanceConfig() { _mc.killJobs(); injectMaintenanceJobs(); @@ -1464,6 +1460,15 @@ containsJob(const MaintenanceController::JobList &jobs, const vespalib::string & return itr != jobs.end(); } +bool +containsJobAndExecutedBy(const MaintenanceController::JobList &jobs, const vespalib::string &jobName, + const vespalib::Executor & executor) +{ + auto itr = std::find_if(jobs.begin(), jobs.end(), + [&](const auto &job){ return job->getJob().getName() == jobName; }); + return itr != jobs.end() && (&(*itr)->getExecutor() == &executor); +} + TEST_F("require that lid space compaction jobs can be disabled", MaintenanceControllerFixture) { f._lscHandlers.push_back(std::make_unique<MockLidSpaceCompactionHandler>("my_handler")); @@ -1481,6 +1486,19 @@ TEST_F("require that lid space compaction jobs can be disabled", MaintenanceCont } } +TEST_F("Require that maintenance jobs are run by correct executor", MaintenanceControllerFixture) +{ + f.injectMaintenanceJobs(); + auto jobs = f._mc.getJobList(); + EXPECT_EQUAL(6u, jobs.size()); + EXPECT_TRUE(containsJobAndExecutedBy(jobs, "heart_beat", f._threadService)); + EXPECT_TRUE(containsJobAndExecutedBy(jobs, "prune_session_cache", f._executor)); + EXPECT_TRUE(containsJobAndExecutedBy(jobs, "wipe_old_removed_fields", f._threadService)); + EXPECT_TRUE(containsJobAndExecutedBy(jobs, "prune_removed_documents.searchdocument", f._threadService)); + EXPECT_TRUE(containsJobAndExecutedBy(jobs, "move_buckets.searchdocument", f._threadService)); + EXPECT_TRUE(containsJobAndExecutedBy(jobs, "sample_attribute_usage.searchdocument", f._threadService)); +} + TEST_MAIN() { TEST_RUN_ALL(); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 0d74b015754..8c3977d675f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -165,7 +165,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _baseDir, protonCfg, hwInfo), - _maintenanceController(_writeService.master(), _docTypeName), + _maintenanceController(_writeService.master(), summaryExecutor, _docTypeName), _visibility(_feedHandler, _writeService, _feedView), _lidSpaceCompactionHandlers(), _jobTrackers(), diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.cpp index b3293eb2852..d2dace3c08f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.cpp @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fastos/fastos.h> + #include "documentdb_commit_job.h" +#include "icommitable.h" namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.h b/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.h index 84ebc463dc4..e42dcdef070 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb_commit_job.h @@ -2,10 +2,12 @@ #pragma once #include "i_maintenance_job.h" -#include "icommitable.h" +#include <vespa/fastos/timestamp.h> namespace proton { +class ICommitable; + /** * Job that regularly commits the documentdb. */ diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h b/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h index 0fb3c773471..3c35ff65c89 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h +++ b/searchcore/src/vespa/searchcore/proton/server/ibucketstatecalculator.h @@ -2,33 +2,20 @@ #pragma once -namespace document -{ - -class BucketId; +#include <memory> -} +namespace document { class BucketId; } -namespace proton -{ +namespace proton { struct IBucketStateCalculator { typedef std::shared_ptr<IBucketStateCalculator> SP; virtual bool shouldBeReady(const document::BucketId &bucket) const = 0; - - virtual bool - clusterUp(void) const = 0; - - virtual bool - nodeUp(void) const = 0; - - virtual bool - nodeInitializing() const = 0; - + virtual bool clusterUp() const = 0; + virtual bool nodeUp() const = 0; + virtual bool nodeInitializing() const = 0; virtual ~IBucketStateCalculator() {} }; - } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index 972542fc473..9217913e6a6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -1,11 +1,6 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fastos/fastos.h> -#include <vespa/log/log.h> -LOG_SETUP(".proton.server.maintenance_jobs_injector"); - #include "bucketmovejob.h" -#include "documentbucketmover.h" #include "documentdb_commit_job.h" #include "heart_beat_job.h" #include "job_tracked_maintenance_job.h" @@ -15,7 +10,6 @@ LOG_SETUP(".proton.server.maintenance_jobs_injector"); #include "pruneremoveddocumentsjob.h" #include "sample_attribute_usage_job.h" #include "wipe_old_removed_fields_job.h" -#include <vespa/fastos/timestamp.h> using fastos::ClockSystem; using fastos::TimeStamp; diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 7565339a86a..d5bb3bf92a7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -31,9 +31,11 @@ public: } MaintenanceController::MaintenanceController(IThreadService &masterThread, + vespalib::Executor & defaultExecutor, const DocTypeName &docTypeName) : IBucketFreezeListener(), _masterThread(masterThread), + _defaultExecutor(defaultExecutor), _readySubDB(), _remSubDB(), _notReadySubDB(), @@ -66,7 +68,7 @@ void MaintenanceController::registerJobInDefaultPool(IMaintenanceJob::UP job) { // Called by master write thread - registerJob(_masterThread, std::move(job)); + registerJob(_defaultExecutor, std::move(job)); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 7e49b0b10b6..59ddd051679 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -33,7 +33,7 @@ public: using JobList = std::vector<std::shared_ptr<MaintenanceJobRunner>>; using UP = std::unique_ptr<MaintenanceController>; - MaintenanceController(IThreadService &masterThread, const DocTypeName &docTypeName); + MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, const DocTypeName &docTypeName); virtual ~MaintenanceController(); void registerJobInMasterThread(IMaintenanceJob::UP job); @@ -72,6 +72,7 @@ private: using Guard = std::lock_guard<Mutex>; IThreadService &_masterThread; + vespalib::Executor &_defaultExecutor; MaintenanceDocumentSubDB _readySubDB; MaintenanceDocumentSubDB _remSubDB; MaintenanceDocumentSubDB _notReadySubDB; diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.h b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.h index 45dee67bb24..e6dfcd78442 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.h @@ -7,8 +7,7 @@ #include <vespa/vespalib/util/executor.h> #include <mutex> -namespace proton -{ +namespace proton { class MaintenanceJobRunner : public IMaintenanceJobRunner { @@ -30,12 +29,12 @@ public: typedef std::shared_ptr<MaintenanceJobRunner> SP; MaintenanceJobRunner(vespalib::Executor &executor, IMaintenanceJob::UP job); - virtual void run() override; + void run() override; void stop() { _stopped = true; } bool isRunning() const; + const vespalib::Executor & getExecutor() const { return _executor; } const IMaintenanceJob &getJob() const { return *_job; } IMaintenanceJob &getJob() { return *_job; } }; } // namespace proton - |