diff options
Diffstat (limited to 'searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp')
-rw-r--r-- | searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp new file mode 100644 index 00000000000..bac793fc6e5 --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -0,0 +1,247 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "lid_space_jobtest.h" +#include <vespa/searchcore/proton/server/lid_space_compaction_job.h> +#include <vespa/searchcore/proton/server/lid_space_compaction_job_take2.h> +#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> + +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; + +struct MyDirectJobRunner : public IMaintenanceJobRunner { + IMaintenanceJob &_job; + explicit MyDirectJobRunner(IMaintenanceJob &job) + : _job(job) + { + _job.registerRunner(this); + } + void run() override { _job.run(); } +}; + +struct MyCountJobRunner : public IMaintenanceJobRunner { + uint32_t runCnt; + explicit MyCountJobRunner(IMaintenanceJob &job) : runCnt(0) { + job.registerRunner(this); + } + void run() override { ++runCnt; } +}; + +JobTestBase::JobTestBase() + : _handler(), + _storer(), + _frozenHandler(), + _diskMemUsageNotifier(), + _clusterStateHandler(), + _job() +{ + init(ALLOWED_LID_BLOAT, ALLOWED_LID_BLOAT_FACTOR, RESOURCE_LIMIT_FACTOR, JOB_DELAY, false, MAX_OUTSTANDING_MOVE_OPS); +} + +void +JobTestBase::init(uint32_t allowedLidBloat, + double allowedLidBloatFactor, + double resourceLimitFactor, + vespalib::duration interval, + bool nodeRetired, + uint32_t maxOutstandingMoveOps) +{ + _handler = std::make_unique<MyHandler>(maxOutstandingMoveOps != MAX_OUTSTANDING_MOVE_OPS, useBucketDB()); + DocumentDBLidSpaceCompactionConfig compactCfg(interval, allowedLidBloat, allowedLidBloatFactor, + REMOVE_BATCH_BLOCK_RATE, REMOVE_BLOCK_RATE, false); + BlockableMaintenanceJobConfig blockableCfg(resourceLimitFactor, maxOutstandingMoveOps); + + if (useBucketDB()) { + _bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4); + _job = std::make_unique<lidspace::CompactionJob>(compactCfg, *_handler, _storer, *_bucketExecutor, _diskMemUsageNotifier, + blockableCfg, _clusterStateHandler, nodeRetired, + document::BucketSpace::placeHolder()); + } else { + _job = std::make_unique<LidSpaceCompactionJob>(compactCfg, *_handler, _storer, _frozenHandler, _diskMemUsageNotifier, + blockableCfg, _clusterStateHandler, nodeRetired); + } +} +void +JobTestBase::sync() const { + if (_bucketExecutor) { + _bucketExecutor->sync(); + } +} + +JobTestBase & +JobTestBase::addStats(uint32_t docIdLimit, const LidVector &usedLids, const LidPairVector &usedFreePairs) +{ + return addMultiStats(docIdLimit, {usedLids}, usedFreePairs); +} +JobTestBase & +JobTestBase::addMultiStats(uint32_t docIdLimit, + const std::vector<LidVector> &usedLidsVector, + const LidPairVector &usedFreePairs) { + uint32_t usedLids = usedLidsVector[0].size(); + for (auto pair : usedFreePairs) { + uint32_t highestUsedLid = pair.first; + uint32_t lowestFreeLid = pair.second; + _handler->_stats.emplace_back(docIdLimit, usedLids, lowestFreeLid, highestUsedLid); + } + _handler->_lids = usedLidsVector; + return *this; +} +JobTestBase & +JobTestBase::addStats(uint32_t docIdLimit, + uint32_t numDocs, + uint32_t lowestFreeLid, + uint32_t highestUsedLid) { + _handler->_stats.emplace_back(docIdLimit, numDocs, lowestFreeLid, highestUsedLid); + return *this; +} +bool +JobTestBase::run() const { + return _job->run(); +} +JobTestBase & +JobTestBase::endScan() { + EXPECT_FALSE(run()); + return *this; +} +JobTestBase & +JobTestBase::compact() { + EXPECT_TRUE(run()); + return *this; +} +void +JobTestBase::notifyNodeRetired(bool nodeRetired) { + test::BucketStateCalculator::SP calc = std::make_shared<test::BucketStateCalculator>(); + calc->setNodeRetired(nodeRetired); + _clusterStateHandler.notifyClusterStateChanged(calc); +} +void +JobTestBase::assertJobContext(uint32_t moveToLid, + uint32_t moveFromLid, + uint32_t handleMoveCnt, + uint32_t wantedLidLimit, + uint32_t compactStoreCnt) const +{ + sync(); + EXPECT_EQ(moveToLid, _handler->_moveToLid); + EXPECT_EQ(moveFromLid, _handler->_moveFromLid); + EXPECT_EQ(handleMoveCnt, _handler->_handleMoveCnt); + EXPECT_EQ(handleMoveCnt, _storer._moveCnt); + EXPECT_EQ(wantedLidLimit, _handler->_wantedLidLimit); + EXPECT_EQ(compactStoreCnt, _storer._compactCnt); +} +void +JobTestBase::assertNoWorkDone() const { + assertJobContext(0, 0, 0, 0, 0); +} +JobTestBase & +JobTestBase::setupOneDocumentToCompact() { + addStats(10, {1,3,4,5,6,9}, + {{9,2}, // 30% bloat: move 9 -> 2 + {6,7}}); // no documents to move + return *this; +} +void +JobTestBase::assertOneDocumentCompacted() { + assertJobContext(2, 9, 1, 0, 0); + endScan().compact(); + assertJobContext(2, 9, 1, 7, 1); +} +JobTestBase & +JobTestBase::setupThreeDocumentsToCompact() { + addStats(10, {1,5,6,9,8,7}, + {{9,2}, // 30% bloat: move 9 -> 2 + {8,3}, // move 8 -> 3 + {7,4}, // move 7 -> 4 + {6,7}}); // no documents to move + return *this; +} + +JobTestBase::~JobTestBase() { + _handler->clearMoveDoneContexts(); +} + +JobTest::JobTest() + : JobTestBase(), + _jobRunner(std::make_unique<MyDirectJobRunner>(*_job)) +{} +JobTest::~JobTest() = default; +void +JobTest::init(uint32_t allowedLidBloat, + double allowedLidBloatFactor, + double resourceLimitFactor, + vespalib::duration interval, + bool nodeRetired, + uint32_t maxOutstandingMoveOps) +{ + JobTestBase::init(allowedLidBloat, allowedLidBloatFactor, resourceLimitFactor, interval, nodeRetired, maxOutstandingMoveOps); + _jobRunner = std::make_unique<MyDirectJobRunner>(*_job); +} +void +JobTest::init_with_interval(vespalib::duration interval) { + init(ALLOWED_LID_BLOAT, ALLOWED_LID_BLOAT_FACTOR, RESOURCE_LIMIT_FACTOR, interval); +} +void +JobTest::init_with_node_retired(bool retired) { + init(ALLOWED_LID_BLOAT, ALLOWED_LID_BLOAT_FACTOR, RESOURCE_LIMIT_FACTOR, JOB_DELAY, retired); +} + + +JobDisabledByRemoveOpsTest::JobDisabledByRemoveOpsTest() : JobTest() {} +JobDisabledByRemoveOpsTest::~JobDisabledByRemoveOpsTest() = default; + +void +JobDisabledByRemoveOpsTest::job_is_disabled_while_remove_ops_are_ongoing(bool remove_batch) { + setupOneDocumentToCompact(); + _handler->run_remove_ops(remove_batch); + EXPECT_TRUE(run()); // job is disabled + assertNoWorkDone(); +} + +void +JobDisabledByRemoveOpsTest::job_becomes_disabled_if_remove_ops_starts(bool remove_batch) { + setupThreeDocumentsToCompact(); + EXPECT_FALSE(run()); // job executed as normal (with more work to do) + assertJobContext(2, 9, 1, 0, 0); + + _handler->run_remove_ops(remove_batch); + EXPECT_TRUE(run()); // job is disabled + assertJobContext(2, 9, 1, 0, 0); +} + +void +JobDisabledByRemoveOpsTest::job_is_re_enabled_when_remove_ops_are_no_longer_ongoing(bool remove_batch) { + job_becomes_disabled_if_remove_ops_starts(remove_batch); + + _handler->stop_remove_ops(remove_batch); + EXPECT_FALSE(run()); // job executed as normal (with more work to do) + assertJobContext(3, 8, 2, 0, 0); +} + +MaxOutstandingJobTest::MaxOutstandingJobTest() + : JobTest(), + runner() +{} +MaxOutstandingJobTest::~MaxOutstandingJobTest() = default; + +void +MaxOutstandingJobTest::init(uint32_t maxOutstandingMoveOps) { + JobTest::init(ALLOWED_LID_BLOAT, ALLOWED_LID_BLOAT_FACTOR, + RESOURCE_LIMIT_FACTOR, JOB_DELAY, false, maxOutstandingMoveOps); + runner = std::make_unique<MyCountJobRunner>(*_job); +} +void +MaxOutstandingJobTest::assertRunToBlocked() { + EXPECT_TRUE(run()); // job becomes blocked as max outstanding limit is reached + EXPECT_TRUE(_job->isBlocked()); + EXPECT_TRUE(_job->isBlocked(BlockedReason::OUTSTANDING_OPS)); +} +void +MaxOutstandingJobTest::assertRunToNotBlocked() { + EXPECT_FALSE(run()); + EXPECT_FALSE(_job->isBlocked()); +} +void +MaxOutstandingJobTest::unblockJob(uint32_t expRunnerCnt) { + _handler->clearMoveDoneContexts(); // unblocks job and try to execute it via runner + EXPECT_EQ(expRunnerCnt, runner->runCnt); + EXPECT_FALSE(_job->isBlocked()); +} + |