diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-10 13:43:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-10 13:43:22 +0200 |
commit | 8ac23b7631bf41a61daf703bcc2a1edeaf4ff58b (patch) | |
tree | ea59761dc07c51080136eb9e7d4a803bf8012057 | |
parent | 2b54944bd62c757813b900c06cc10c20bddd7867 (diff) | |
parent | 4731819e1378098d56cda6d6d8dad522f737b241 (diff) |
Merge pull request #17353 from vespa-engine/balder/avoid-blocking-during-stop
Use shared_ptr to for bucket move job to avoid blocking duirng reconfig.
4 files changed, 269 insertions, 298 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 85f80293377..bb7180dadf1 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -72,7 +72,7 @@ struct ControllerFixtureBase : public ::testing::Test MyFrozenBucketHandler _fbh; BucketCreateNotifier _bucketCreateNotifier; test::DiskMemUsageNotifier _diskMemUsageNotifier; - BucketMoveJob _bmj; + std::shared_ptr<BucketMoveJob> _bmj; MyCountJobRunner _runner; ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); ~ControllerFixtureBase(); @@ -97,7 +97,7 @@ struct ControllerFixtureBase : public ::testing::Test } ControllerFixtureBase &remFrozen(const BucketId &bucket) { _fbh.remFrozen(bucket); - _bmj.notifyThawedBucket(bucket); + _bmj->notifyThawedBucket(bucket); return *this; } ControllerFixtureBase &activateBucket(const BucketId &bucket) { @@ -120,7 +120,7 @@ struct ControllerFixtureBase : public ::testing::Test return _calc->asked(); } void runLoop() { - while (!_bmj.isBlocked() && !_bmj.run()) { + while (!_bmj->isBlocked() && !_bmj->run()) { } } }; @@ -137,11 +137,10 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig _fbh(), _bucketCreateNotifier(), _diskMemUsageNotifier(), - _bmj(_calc, _moveHandler, _modifiedHandler, _ready._subDb, - _notReady._subDb, _fbh, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, - _diskMemUsageNotifier, blockableConfig, - "test", makeBucketSpace()), - _runner(_bmj) + _bmj(std::make_shared<BucketMoveJob>(_calc, _moveHandler, _modifiedHandler, _ready._subDb, _notReady._subDb, + _fbh, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, + _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())), + _runner(*_bmj) { } @@ -179,11 +178,11 @@ struct OnlyReadyControllerFixture : public ControllerFixtureBase TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so) { - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); - _bmj.scanAndMove(4, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); EXPECT_TRUE(docsMoved().empty()); EXPECT_TRUE(bucketsModified().empty()); } @@ -194,8 +193,8 @@ TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_buc addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); addReady(_notReady.bucket(4)); - _bmj.scanAndMove(4, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]); assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]); @@ -208,8 +207,8 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc { // bucket 2 should be moved addReady(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(4, 3); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(2u, docsMoved().size()); assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); @@ -225,14 +224,14 @@ TEST_F(ControllerFixture, require_that_maxBucketsToScan_is_taken_into_considerat addReady(_notReady.bucket(4)); // buckets 1, 2, and 3 considered - _bmj.scanAndMove(3, 3); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(3, 3); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // move bucket 4 - _bmj.scanAndMove(1, 4); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(1, 4); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]); assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]); @@ -249,14 +248,14 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) addReady(_notReady.bucket(4)); // consider move bucket 1 - _bmj.scanAndMove(1, 2); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 2); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // move bucket 2, docs 1,2 - _bmj.scanAndMove(1, 2); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 2); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(2u, docsMoved().size()); EXPECT_TRUE(assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0])); EXPECT_TRUE(assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1])); @@ -264,8 +263,8 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); // move bucket 3, docs 1,2 - _bmj.scanAndMove(1, 2); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 2); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(4u, docsMoved().size()); EXPECT_TRUE(assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[2])); EXPECT_TRUE(assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[3])); @@ -273,16 +272,16 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]); // move bucket 4, docs 1,2 - _bmj.scanAndMove(1, 2); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 2); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(6u, docsMoved().size()); EXPECT_TRUE(assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[4])); EXPECT_TRUE(assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[5])); EXPECT_EQ(2u, bucketsModified().size()); // move bucket 4, docs 3 - _bmj.scanAndMove(1, 2); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(1, 2); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(7u, docsMoved().size()); EXPECT_TRUE(assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[6])); EXPECT_EQ(3u, bucketsModified().size()); @@ -297,10 +296,10 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca addReady(_ready.bucket(2)); // start with bucket2 - _bmj.scanAndMove(1, 0); + _bmj->scanAndMove(1, 0); changeCalc(); - _bmj.scanAndMove(5, 0); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(5, 0); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_ready.bucket(2), calcAsked()[0]); EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]); @@ -309,10 +308,10 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca // start with bucket3 changeCalc(); - _bmj.scanAndMove(2, 0); + _bmj->scanAndMove(2, 0); changeCalc(); - _bmj.scanAndMove(5, 0); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(5, 0); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_notReady.bucket(3), calcAsked()[0]); EXPECT_EQ(_notReady.bucket(4), calcAsked()[1]); @@ -321,10 +320,10 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca // start with bucket4 changeCalc(); - _bmj.scanAndMove(3, 0); + _bmj->scanAndMove(3, 0); changeCalc(); - _bmj.scanAndMove(5, 0); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(5, 0); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_notReady.bucket(4), calcAsked()[0]); EXPECT_EQ(_ready.bucket(1), calcAsked()[1]); @@ -333,8 +332,8 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca // start with bucket1 changeCalc(); - _bmj.scanAndMove(5, 0); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(5, 0); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); EXPECT_EQ(_ready.bucket(2), calcAsked()[1]); @@ -343,16 +342,16 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca // change calc in second pass changeCalc(); - _bmj.scanAndMove(3, 0); + _bmj->scanAndMove(3, 0); changeCalc(); - _bmj.scanAndMove(2, 0); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(2, 0); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(2u, calcAsked().size()); EXPECT_EQ(_notReady.bucket(4), calcAsked()[0]); EXPECT_EQ(_ready.bucket(1), calcAsked()[1]); changeCalc(); - _bmj.scanAndMove(5, 0); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(5, 0); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_ready.bucket(2), calcAsked()[0]); EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]); @@ -361,22 +360,22 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca // check 1 bucket at a time, start with bucket2 changeCalc(); - _bmj.scanAndMove(1, 0); + _bmj->scanAndMove(1, 0); changeCalc(); - _bmj.scanAndMove(1, 0); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 0); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, calcAsked().size()); EXPECT_EQ(_ready.bucket(2), calcAsked()[0]); - _bmj.scanAndMove(1, 0); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 0); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(2u, calcAsked().size()); EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]); - _bmj.scanAndMove(1, 0); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 0); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(3u, calcAsked().size()); EXPECT_EQ(_notReady.bucket(4), calcAsked()[2]); - _bmj.scanAndMove(1, 0); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(1, 0); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_ready.bucket(1), calcAsked()[3]); } @@ -385,16 +384,16 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w { // bucket 1 should be moved addReady(_ready.bucket(2)); - _bmj.scanAndMove(3, 1); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(3, 1); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(1u, calcAsked().size()); changeCalc(); // Not cancelled, bucket 1 still moving to notReady EXPECT_EQ(1u, calcAsked().size()); EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); _calc->resetAsked(); - _bmj.scanAndMove(2, 1); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(2, 1); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, calcAsked().size()); addReady(_ready.bucket(1)); @@ -406,13 +405,13 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w changeCalc(); // not cancelled. No active bucket move EXPECT_EQ(0u, calcAsked().size()); _calc->resetAsked(); - _bmj.scanAndMove(2, 1); + _bmj->scanAndMove(2, 1); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(2u, calcAsked().size()); EXPECT_EQ(_ready.bucket(2), calcAsked()[0]); EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]); - _bmj.scanAndMove(2, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(2, 3); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_notReady.bucket(4), calcAsked()[2]); @@ -425,12 +424,12 @@ TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_don addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); addReady(_notReady.bucket(4)); - _bmj.scanAndMove(4, 1); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(4, 1); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(4u, calcAsked().size()); - _bmj.scanAndMove(0, 2); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(0, 2); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(4u, calcAsked().size()); } @@ -440,13 +439,13 @@ TEST_F(ControllerFixture, require_that_frozen_bucket_is_not_moved_until_thawed) // bucket 1 should be moved but is frozen addReady(_ready.bucket(2)); addFrozen(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); // scan all, delay frozen bucket 1 + _bmj->scanAndMove(4, 3); // scan all, delay frozen bucket 1 remFrozen(_ready.bucket(1)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); - _bmj.scanAndMove(0, 3); // move delayed and thawed bucket 1 - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(0, 3); // move delayed and thawed bucket 1 + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); @@ -460,19 +459,19 @@ TEST_F(ControllerFixture, require_that_thawed_bucket_is_moved_before_other_bucke addReady(_notReady.bucket(3)); addReady(_notReady.bucket(4)); addFrozen(_ready.bucket(2)); - _bmj.scanAndMove(3, 2); // delay bucket 2, move bucket 3 + _bmj->scanAndMove(3, 2); // delay bucket 2, move bucket 3 remFrozen(_ready.bucket(2)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(2u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); - _bmj.scanAndMove(2, 2); // move thawed bucket 2 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(2, 2); // move thawed bucket 2 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(4u, docsMoved().size()); EXPECT_EQ(2u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(2), bucketsModified()[1]); - _bmj.scanAndMove(1, 4); // move bucket 4 - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(1, 4); // move bucket 4 + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(7u, docsMoved().size()); EXPECT_EQ(3u, bucketsModified().size()); EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]); @@ -483,31 +482,31 @@ TEST_F(ControllerFixture, require_that_re_frozen_thawed_bucket_is_not_moved_unti // bucket 1 should be moved but is re-frozen addReady(_ready.bucket(2)); addFrozen(_ready.bucket(1)); - _bmj.scanAndMove(1, 0); // scan, delay frozen bucket 1 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 0); // scan, delay frozen bucket 1 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(1u, calcAsked().size()); EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); remFrozen(_ready.bucket(1)); addFrozen(_ready.bucket(1)); - _bmj.scanAndMove(1, 0); // scan, but nothing to move - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 0); // scan, but nothing to move + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(3u, calcAsked().size()); EXPECT_EQ(_ready.bucket(1), calcAsked()[1]); EXPECT_EQ(_ready.bucket(2), calcAsked()[2]); remFrozen(_ready.bucket(1)); - _bmj.scanAndMove(3, 4); // move delayed and thawed bucket 1 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(3, 4); // move delayed and thawed bucket 1 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_ready.bucket(1), calcAsked()[3]); - _bmj.scanAndMove(2, 0); // scan the rest - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(2, 0); // scan the rest + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(6u, calcAsked().size()); @@ -520,15 +519,15 @@ TEST_F(ControllerFixture, require_that_thawed_bucket_is_not_moved_if_new_calcula addReady(_ready.bucket(2)); addReady(_notReady.bucket(3)); addFrozen(_notReady.bucket(3)); - _bmj.scanAndMove(4, 3); // scan all, delay frozen bucket 3 + _bmj->scanAndMove(4, 3); // scan all, delay frozen bucket 3 remFrozen(_notReady.bucket(3)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(4u, calcAsked().size()); changeCalc(); remReady(_notReady.bucket(3)); - _bmj.scanAndMove(0, 3); // consider delayed bucket 3 + _bmj->scanAndMove(0, 3); // consider delayed bucket 3 EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(1u, calcAsked().size()); @@ -541,8 +540,8 @@ TEST_F(ControllerFixture, require_that_current_bucket_mover_is_cancelled_if_buck addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); addReady(_notReady.bucket(3)); - _bmj.scanAndMove(3, 1); // move 1 doc from bucket 3 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(3, 1); // move 1 doc from bucket 3 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(3u, calcAsked().size()); @@ -551,22 +550,22 @@ TEST_F(ControllerFixture, require_that_current_bucket_mover_is_cancelled_if_buck EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]); addFrozen(_notReady.bucket(3)); - _bmj.scanAndMove(1, 3); // done scanning - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 3); // done scanning + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(3u, calcAsked().size()); - _bmj.scanAndMove(1, 3); // done scanning + _bmj->scanAndMove(1, 3); // done scanning remFrozen(_notReady.bucket(3)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]); - _bmj.scanAndMove(0, 2); // move all docs from bucket 3 again - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(0, 2); // move all docs from bucket 3 again + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); @@ -581,14 +580,14 @@ TEST_F(ControllerFixture, require_that_current_bucket_mover_is_not_cancelled_if_ addReady(_ready.bucket(2)); addReady(_notReady.bucket(3)); addReady(_notReady.bucket(4)); - _bmj.scanAndMove(3, 1); // move 1 doc from bucket 3 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(3, 1); // move 1 doc from bucket 3 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(3u, calcAsked().size()); addFrozen(_notReady.bucket(4)); - _bmj.scanAndMove(1, 2); // move rest of docs from bucket 3 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 2); // move rest of docs from bucket 3 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(2u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); @@ -600,15 +599,15 @@ TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_ // bucket 1 should be moved but is active addReady(_ready.bucket(2)); activateBucket(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1 - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1 + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); deactivateBucket(_ready.bucket(1)); - EXPECT_FALSE(_bmj.done()); - _bmj.scanAndMove(0, 3); // move delayed and de-activated bucket 1 - EXPECT_TRUE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); + _bmj->scanAndMove(0, 3); // move delayed and de-activated bucket 1 + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); @@ -619,21 +618,21 @@ TEST_F(OnlyReadyControllerFixture, require_that_de_activated_bucket_is_moved_bef // bucket 1, 2, 3 should be moved (but bucket 1 is active) addReady(_ready.bucket(4)); activateBucket(_ready.bucket(1)); - _bmj.scanAndMove(2, 4); // delay bucket 1, move bucket 2 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(2, 4); // delay bucket 1, move bucket 2 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(2u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); deactivateBucket(_ready.bucket(1)); - _bmj.scanAndMove(2, 4); // move de-activated bucket 1 - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(2, 4); // move de-activated bucket 1 + EXPECT_FALSE(_bmj->done()); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(2u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(1), bucketsModified()[1]); - _bmj.scanAndMove(2, 4); // move bucket 3 - // EXPECT_TRUE(_bmj.done()); // TODO(geirst): fix this + _bmj->scanAndMove(2, 4); // move bucket 3 + // EXPECT_TRUE(_bmj->done()); // TODO(geirst): fix this EXPECT_EQ(6u, docsMoved().size()); EXPECT_EQ(3u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(3), bucketsModified()[2]); @@ -644,14 +643,14 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_c // bucket 1 should be moved addReady(_ready.bucket(2)); activateBucket(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1 + _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1 EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); deactivateBucket(_ready.bucket(1)); addReady(_ready.bucket(1)); changeCalc(); - _bmj.scanAndMove(0, 3); // consider delayed bucket 3 + _bmj->scanAndMove(0, 3); // consider delayed bucket 3 EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); EXPECT_EQ(1u, calcAsked().size()); @@ -663,18 +662,18 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_froze // bucket 1 should be moved addReady(_ready.bucket(2)); activateBucket(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1 + _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1 EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); addFrozen(_ready.bucket(1)); deactivateBucket(_ready.bucket(1)); - _bmj.scanAndMove(0, 3); // bucket 1 de-activated but frozen + _bmj->scanAndMove(0, 3); // bucket 1 de-activated but frozen EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); remFrozen(_ready.bucket(1)); - _bmj.scanAndMove(0, 3); // handle thawed bucket 1 + _bmj->scanAndMove(0, 3); // handle thawed bucket 1 EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); @@ -685,18 +684,18 @@ TEST_F(ControllerFixture, require_that_thawed_bucket_is_not_moved_if_active_as_w // bucket 1 should be moved addReady(_ready.bucket(2)); addFrozen(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); // scan all, delay frozen bucket 1 + _bmj->scanAndMove(4, 3); // scan all, delay frozen bucket 1 EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); activateBucket(_ready.bucket(1)); remFrozen(_ready.bucket(1)); - _bmj.scanAndMove(0, 3); // bucket 1 thawed but active + _bmj->scanAndMove(0, 3); // bucket 1 thawed but active EXPECT_EQ(0u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); deactivateBucket(_ready.bucket(1)); - _bmj.scanAndMove(0, 3); // handle de-activated bucket 1 + _bmj->scanAndMove(0, 3); // handle de-activated bucket 1 EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); @@ -707,8 +706,8 @@ TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_ _calc->setNodeRetired(true); // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired. addReady(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); } @@ -720,8 +719,8 @@ TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_i addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); addReady(_notReady.bucket(3)); - _bmj.scanAndMove(4, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(0u, docsMoved().size()); } @@ -732,8 +731,8 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea addReady(_ready.bucket(2)); addReady(_notReady.bucket(3)); activateBucket(_notReady.bucket(3)); - _bmj.scanAndMove(4, 3); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(4, 3); + EXPECT_FALSE(_bmj->done()); ASSERT_EQ(2u, docsMoved().size()); assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]); assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]); @@ -743,19 +742,19 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) { - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); runLoop(); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); EXPECT_TRUE(docsMoved().empty()); EXPECT_TRUE(bucketsModified().empty()); addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify - EXPECT_TRUE(_bmj.done()); // move job still believes work done - _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 - EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); // move job still believes work done + _bmj->notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 + EXPECT_FALSE(_bmj->done()); runLoop(); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); EXPECT_EQ(1u, bucketsModified().size()); EXPECT_EQ(2u, docsMoved().size()); } @@ -769,18 +768,18 @@ struct ResourceLimitControllerFixture : public ControllerFixture void testJobStopping(DiskMemUsageState blockingUsageState) { // Bucket 1 should be moved addReady(_ready.bucket(2)); - // Note: This depends on _bmj.run() moving max 1 documents - EXPECT_TRUE(!_bmj.run()); + // Note: This depends on _bmj->run() moving max 1 documents + EXPECT_TRUE(!_bmj->run()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // Notify that we've over limit _diskMemUsageNotifier.notify(blockingUsageState); - EXPECT_TRUE(_bmj.run()); + EXPECT_TRUE(_bmj->run()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // Notify that we've under limit _diskMemUsageNotifier.notify(DiskMemUsageState()); - EXPECT_TRUE(!_bmj.run()); + EXPECT_TRUE(!_bmj->run()); EXPECT_EQ(2u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); } @@ -788,13 +787,13 @@ struct ResourceLimitControllerFixture : public ControllerFixture void testJobNotStopping(DiskMemUsageState blockingUsageState) { // Bucket 1 should be moved addReady(_ready.bucket(2)); - // Note: This depends on _bmj.run() moving max 1 documents - EXPECT_TRUE(!_bmj.run()); + // Note: This depends on _bmj->run() moving max 1 documents + EXPECT_TRUE(!_bmj->run()); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // Notify that we've over limit, but not over adjusted limit _diskMemUsageNotifier.notify(blockingUsageState); - EXPECT_TRUE(!_bmj.run()); + EXPECT_TRUE(!_bmj->run()); EXPECT_EQ(2u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); } @@ -834,20 +833,20 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixture } void assertRunToBlocked() { - EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached - EXPECT_FALSE(_bmj.done()); - EXPECT_TRUE(_bmj.isBlocked()); - EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS)); + EXPECT_TRUE(_bmj->run()); // job becomes blocked as max outstanding limit is reached + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->isBlocked()); + EXPECT_TRUE(_bmj->isBlocked(BlockedReason::OUTSTANDING_OPS)); } void assertRunToNotBlocked() { - EXPECT_FALSE(_bmj.run()); - EXPECT_FALSE(_bmj.done()); - EXPECT_FALSE(_bmj.isBlocked()); + EXPECT_FALSE(_bmj->run()); + EXPECT_FALSE(_bmj->done()); + EXPECT_FALSE(_bmj->isBlocked()); } void assertRunToFinished() { - EXPECT_TRUE(_bmj.run()); - EXPECT_TRUE(_bmj.done()); - EXPECT_FALSE(_bmj.isBlocked()); + EXPECT_TRUE(_bmj->run()); + EXPECT_TRUE(_bmj->done()); + EXPECT_FALSE(_bmj->isBlocked()); } void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) { EXPECT_EQ(expDocsMovedCnt, docsMoved().size()); @@ -856,7 +855,7 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixture void unblockJob(uint32_t expRunnerCnt) { _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner EXPECT_EQ(expRunnerCnt, _runner.runCount); - EXPECT_FALSE(_bmj.isBlocked()); + EXPECT_FALSE(_bmj->isBlocked()); } }; diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp index 5223e20bd5c..866c3c45e02 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp @@ -42,7 +42,7 @@ struct ControllerFixtureBase : public ::testing::Test DummyBucketExecutor _bucketExecutor; MyMoveHandler _moveHandler; DocumentDBTaggedMetrics _metrics; - BucketMoveJobV2 _bmj; + std::shared_ptr<BucketMoveJobV2> _bmj; MyCountJobRunner _runner; ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); ~ControllerFixtureBase(); @@ -89,11 +89,11 @@ struct ControllerFixtureBase : public ::testing::Test return _calc->asked(); } size_t numPending() { - _bmj.updateMetrics(_metrics); + _bmj->updateMetrics(_metrics); return _metrics.bucketMove.bucketsPending.getLast(); } void runLoop() { - while (!_bmj.isBlocked() && !_bmj.run()) { + while (!_bmj->isBlocked() && !_bmj->run()) { } } void sync() { @@ -123,11 +123,11 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig _bucketExecutor(4), _moveHandler(*_bucketDB, storeMoveDoneContexts), _metrics("test", 1), - _bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, - _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, - _diskMemUsageNotifier, blockableConfig, - "test", makeBucketSpace()), - _runner(_bmj) + _bmj(std::make_shared<BucketMoveJobV2>(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, + _ready._subDb, _notReady._subDb, _bucketCreateNotifier, + _clusterStateHandler, _bucketHandler, _diskMemUsageNotifier, + blockableConfig, "test", makeBucketSpace())), + _runner(*_bmj) { } @@ -165,13 +165,13 @@ struct OnlyReadyControllerFixture : public ControllerFixtureBase TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so) { - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); - _bmj.recompute(); + _bmj->recompute(); masterExecute([this]() { - EXPECT_TRUE(_bmj.scanAndMove(4, 3)); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); }); EXPECT_TRUE(docsMoved().empty()); EXPECT_TRUE(bucketsModified().empty()); @@ -185,12 +185,12 @@ TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_buc addReady(_notReady.bucket(4)); EXPECT_EQ(0, numPending()); - _bmj.recompute(); + _bmj->recompute(); EXPECT_EQ(1, numPending()); masterExecute([this]() { - EXPECT_FALSE(_bmj.done()); - EXPECT_TRUE(_bmj.scanAndMove(4, 3)); - EXPECT_TRUE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(0, numPending()); @@ -206,11 +206,11 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc { // bucket 2 should be moved addReady(_ready.bucket(1)); - _bmj.recompute(); + _bmj->recompute(); masterExecute([this]() { - EXPECT_FALSE(_bmj.done()); - EXPECT_TRUE(_bmj.scanAndMove(4, 3)); - EXPECT_TRUE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(2u, docsMoved().size()); @@ -224,19 +224,19 @@ TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error) { // bucket 2 should be moved addReady(_ready.bucket(1)); - _bmj.recompute(); + _bmj->recompute(); failRetrieveForLid(5); masterExecute([this]() { - EXPECT_FALSE(_bmj.done()); - EXPECT_TRUE(_bmj.scanAndMove(4, 3)); - EXPECT_TRUE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); }); sync(); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); fixRetriever(); masterExecute([this]() { - EXPECT_TRUE(_bmj.scanAndMove(4, 3)); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(2u, docsMoved().size()); @@ -254,29 +254,29 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) addReady(_notReady.bucket(3)); addReady(_notReady.bucket(4)); - _bmj.recompute(); + _bmj->recompute(); EXPECT_EQ(3, numPending()); masterExecute([this]() { - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); - EXPECT_FALSE(_bmj.scanAndMove(1, 2)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->scanAndMove(1, 2)); + EXPECT_FALSE(_bmj->done()); }); sync(); EXPECT_EQ(2, numPending()); EXPECT_EQ(2u, docsMoved().size()); masterExecute([this]() { - EXPECT_FALSE(_bmj.scanAndMove(1, 2)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->scanAndMove(1, 2)); + EXPECT_FALSE(_bmj->done()); }); sync(); EXPECT_EQ(2, numPending()); EXPECT_EQ(4u, docsMoved().size()); masterExecute([this]() { - EXPECT_FALSE(_bmj.scanAndMove(1, 2)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->scanAndMove(1, 2)); + EXPECT_FALSE(_bmj->done()); }); sync(); EXPECT_EQ(1, numPending()); @@ -284,8 +284,8 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) // move bucket 4, docs 3 masterExecute([this]() { - EXPECT_TRUE(_bmj.scanAndMove(1, 2)); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->scanAndMove(1, 2)); + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(0, numPending()); @@ -302,19 +302,19 @@ TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_don addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); addReady(_notReady.bucket(4)); - _bmj.recompute(); + _bmj->recompute(); masterExecute([this]() { - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); - EXPECT_FALSE(_bmj.scanAndMove(1, 1)); - EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj->scanAndMove(1, 1)); + EXPECT_FALSE(_bmj->done()); }); sync(); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(4u, calcAsked().size()); masterExecute([this]() { - EXPECT_TRUE(_bmj.scanAndMove(1, 2)); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->scanAndMove(1, 2)); + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(3u, docsMoved().size()); @@ -326,12 +326,12 @@ TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_ { // bucket 1 should be moved but is active addReady(_ready.bucket(2)); - _bmj.recompute(); - EXPECT_FALSE(_bmj.done()); + _bmj->recompute(); + EXPECT_FALSE(_bmj->done()); activateBucket(_ready.bucket(1)); masterExecute([this]() { - EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // scan all, delay active bucket 1 - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // scan all, delay active bucket 1 + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(0u, docsMoved().size()); @@ -339,9 +339,9 @@ TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_ deactivateBucket(_ready.bucket(1)); masterExecute([this]() { - EXPECT_FALSE(_bmj.done()); - EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // move delayed and de-activated bucket 1 - EXPECT_TRUE(_bmj.done()); + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // move delayed and de-activated bucket 1 + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(3u, docsMoved().size()); @@ -355,9 +355,9 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w addReady(_ready.bucket(2)); masterExecute([this]() { - _bmj.recompute(); - _bmj.scanAndMove(1, 1); - EXPECT_FALSE(_bmj.done()); + _bmj->recompute(); + _bmj->scanAndMove(1, 1); + EXPECT_FALSE(_bmj->done()); }); sync(); EXPECT_EQ(1u, docsMoved().size()); @@ -367,8 +367,8 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w EXPECT_EQ(4u, calcAsked().size()); EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); _calc->resetAsked(); - _bmj.scanAndMove(1, 1); - EXPECT_FALSE(_bmj.done()); + _bmj->scanAndMove(1, 1); + EXPECT_FALSE(_bmj->done()); }); sync(); EXPECT_EQ(1u, docsMoved().size()); @@ -383,7 +383,7 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w _calc->resetAsked(); changeCalc(); // not cancelled. No active bucket move EXPECT_EQ(4u, calcAsked().size()); - _bmj.scanAndMove(1, 1); + _bmj->scanAndMove(1, 1); }); sync(); EXPECT_EQ(1u, docsMoved().size()); @@ -391,9 +391,9 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w EXPECT_EQ(_ready.bucket(2), calcAsked()[1]); EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]); masterExecute([this]() { - _bmj.scanAndMove(2, 3); + _bmj->scanAndMove(2, 3); }); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); sync(); EXPECT_EQ(3u, docsMoved().size()); EXPECT_EQ(4u, calcAsked().size()); @@ -405,10 +405,10 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_c { // bucket 1 should be moved addReady(_ready.bucket(2)); - _bmj.recompute(); + _bmj->recompute(); masterExecute([this]() { activateBucket(_ready.bucket(1)); - _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1 + _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1 }); sync(); EXPECT_EQ(0u, docsMoved().size()); @@ -418,7 +418,7 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_c deactivateBucket(_ready.bucket(1)); addReady(_ready.bucket(1)); changeCalc(); - _bmj.scanAndMove(4, 3); // consider delayed bucket 3 + _bmj->scanAndMove(4, 3); // consider delayed bucket 3 }); sync(); EXPECT_EQ(0u, docsMoved().size()); @@ -433,9 +433,9 @@ TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_ // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired. addReady(_ready.bucket(1)); masterExecute([this]() { - _bmj.recompute(); - _bmj.scanAndMove(4, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->recompute(); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(0u, docsMoved().size()); @@ -450,9 +450,9 @@ TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_i addReady(_ready.bucket(2)); addReady(_notReady.bucket(3)); masterExecute([this]() { - _bmj.recompute(); - _bmj.scanAndMove(4, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->recompute(); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); }); sync(); EXPECT_EQ(0u, docsMoved().size()); @@ -464,11 +464,11 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); addReady(_notReady.bucket(3)); - _bmj.recompute(); + _bmj->recompute(); masterExecute([this]() { activateBucket(_notReady.bucket(3)); - _bmj.scanAndMove(4, 3); - EXPECT_TRUE(_bmj.done()); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); }); sync(); ASSERT_EQ(2u, docsMoved().size()); @@ -481,27 +481,27 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) { - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); addReady(_ready.bucket(1)); addReady(_ready.bucket(2)); runLoop(); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); sync(); EXPECT_TRUE(docsMoved().empty()); EXPECT_TRUE(bucketsModified().empty()); addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify - EXPECT_TRUE(_bmj.done()); // move job still believes work done + EXPECT_TRUE(_bmj->done()); // move job still believes work done sync(); EXPECT_TRUE(bucketsModified().empty()); masterExecute([this]() { - _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 - EXPECT_FALSE(_bmj.done()); + _bmj->notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 + EXPECT_FALSE(_bmj->done()); EXPECT_TRUE(bucketsModified().empty()); }); sync(); EXPECT_TRUE(bucketsModified().empty()); runLoop(); - EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(_bmj->done()); sync(); EXPECT_EQ(1u, bucketsModified().size()); @@ -518,22 +518,22 @@ struct ResourceLimitControllerFixture : public ControllerFixture void testJobStopping(DiskMemUsageState blockingUsageState) { // Bucket 1 should be moved addReady(_ready.bucket(2)); - _bmj.recompute(); - EXPECT_FALSE(_bmj.done()); - // Note: This depends on _bmj.run() moving max 1 documents - EXPECT_FALSE(_bmj.run()); + _bmj->recompute(); + EXPECT_FALSE(_bmj->done()); + // Note: This depends on _bmj->run() moving max 1 documents + EXPECT_FALSE(_bmj->run()); sync(); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // Notify that we've over limit _diskMemUsageNotifier.notify(blockingUsageState); - EXPECT_TRUE(_bmj.run()); + EXPECT_TRUE(_bmj->run()); sync(); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // Notify that we've under limit _diskMemUsageNotifier.notify(DiskMemUsageState()); - EXPECT_FALSE(_bmj.run()); + EXPECT_FALSE(_bmj->run()); sync(); EXPECT_EQ(2u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); @@ -542,16 +542,16 @@ struct ResourceLimitControllerFixture : public ControllerFixture void testJobNotStopping(DiskMemUsageState blockingUsageState) { // Bucket 1 should be moved addReady(_ready.bucket(2)); - _bmj.recompute(); - EXPECT_FALSE(_bmj.done()); - // Note: This depends on _bmj.run() moving max 1 documents - EXPECT_FALSE(_bmj.run()); + _bmj->recompute(); + EXPECT_FALSE(_bmj->done()); + // Note: This depends on _bmj->run() moving max 1 documents + EXPECT_FALSE(_bmj->run()); sync(); EXPECT_EQ(1u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); // Notify that we've over limit, but not over adjusted limit _diskMemUsageNotifier.notify(blockingUsageState); - EXPECT_FALSE(_bmj.run()); + EXPECT_FALSE(_bmj->run()); sync(); EXPECT_EQ(2u, docsMoved().size()); EXPECT_EQ(0u, bucketsModified().size()); @@ -600,24 +600,24 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase _builder.createDocs(14, 4, 5); _notReady.insertDocs(_builder.getDocs()); addReady(_ready.bucket(3)); - _bmj.recompute(); + _bmj->recompute(); } void assertRunToBlocked() { - EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached - EXPECT_FALSE(_bmj.done()); - EXPECT_TRUE(_bmj.isBlocked()); - EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS)); + EXPECT_TRUE(_bmj->run()); // job becomes blocked as max outstanding limit is reached + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->isBlocked()); + EXPECT_TRUE(_bmj->isBlocked(BlockedReason::OUTSTANDING_OPS)); } void assertRunToNotBlocked() { - EXPECT_FALSE(_bmj.run()); - EXPECT_FALSE(_bmj.done()); - EXPECT_FALSE(_bmj.isBlocked()); + EXPECT_FALSE(_bmj->run()); + EXPECT_FALSE(_bmj->done()); + EXPECT_FALSE(_bmj->isBlocked()); } void assertRunToFinished() { - EXPECT_TRUE(_bmj.run()); - EXPECT_TRUE(_bmj.done()); - EXPECT_FALSE(_bmj.isBlocked()); + EXPECT_TRUE(_bmj->run()); + EXPECT_TRUE(_bmj->done()); + EXPECT_FALSE(_bmj->isBlocked()); } void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) { EXPECT_EQ(expDocsMovedCnt, docsMoved().size()); @@ -626,7 +626,7 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase void unblockJob(uint32_t expRunnerCnt) { _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner EXPECT_EQ(expRunnerCnt, _runner.runCount); - EXPECT_FALSE(_bmj.isBlocked()); + EXPECT_FALSE(_bmj->isBlocked()); } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index fc2eeaab661..f0847ff051e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -4,7 +4,6 @@ #include "imaintenancejobrunner.h" #include "ibucketstatechangednotifier.h" #include "iclusterstatechangednotifier.h" -#include "maintenancedocumentsubdb.h" #include "i_disk_mem_usage_notifier.h" #include "ibucketmodifiedhandler.h" #include "move_operation_limiter.h" @@ -87,8 +86,6 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & _bucketsInFlight(), _buckets2Move(), _stopped(false), - _startedCount(0), - _executedCount(0), _bucketsPending(0), _bucketCreateNotifier(bucketCreateNotifier), _clusterStateChangedNotifier(clusterStateChangedNotifier), @@ -151,24 +148,10 @@ BucketMoveJobV2::needMove(const ScanIterator &itr) const { return {true, wantReady}; } -namespace { - -class IncOnDestruct { -public: - IncOnDestruct(std::atomic<size_t> & count) : _count(count) {} - ~IncOnDestruct() { - _count.fetch_add(1, std::memory_order_relaxed); - } -private: - std::atomic<size_t> & _count; -}; - -} - -class StartMove : public storage::spi::BucketTask { +class BucketMoveJobV2::StartMove : public storage::spi::BucketTask { public: using IDestructorCallbackSP = std::shared_ptr<vespalib::IDestructorCallback>; - StartMove(BucketMoveJobV2 & job, std::shared_ptr<BucketMover> mover, + StartMove(std::shared_ptr<BucketMoveJobV2> job, std::shared_ptr<BucketMover> mover, std::vector<BucketMover::MoveKey> keys, IDestructorCallbackSP opsTracker) : _job(job), @@ -180,27 +163,27 @@ public: void run(const Bucket &bucket, IDestructorCallbackSP onDone) override { assert(_mover->getBucket() == bucket.getBucketId()); using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>; - _job.prepareMove(std::move(_mover), std::move(_keys), - std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone)))); + BucketMoveJobV2::prepareMove(std::move(_job), std::move(_mover), std::move(_keys), + std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone)))); } void fail(const Bucket &bucket) override { - _job.failOperation(bucket.getBucketId()); + BucketMoveJobV2::failOperation(std::move(_job), bucket.getBucketId()); } private: - BucketMoveJobV2 & _job; + std::shared_ptr<BucketMoveJobV2> _job; std::shared_ptr<BucketMover> _mover; std::vector<BucketMover::MoveKey> _keys; IDestructorCallbackSP _opsTracker; }; void -BucketMoveJobV2::failOperation(BucketId bucketId) { - IncOnDestruct countGuard(_executedCount); - _master.execute(makeLambdaTask([this, bucketId]() { - if (_stopped.load(std::memory_order_relaxed)) return; - considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); +BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucketId) { + auto & master = job->_master; + master.execute(makeLambdaTask([job=std::move(job), bucketId]() { + if (job->_stopped.load(std::memory_order_relaxed)) return; + job->considerBucket(job->_ready.meta_store()->getBucketDB().takeGuard(), bucketId); })); } @@ -213,19 +196,18 @@ BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) { if (keys.empty()) return; mover->updateLastValidGid(keys.back()._gid); Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket())); - auto bucketTask = std::make_unique<StartMove>(*this, std::move(mover), std::move(keys), getLimiter().beginOperation()); - _startedCount.fetch_add(1, std::memory_order_relaxed); + auto bucketTask = std::make_unique<StartMove>(shared_from_this(), std::move(mover), std::move(keys), getLimiter().beginOperation()); _bucketExecutor.execute(spiBucket, std::move(bucketTask)); } void -BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone) +BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone) { - IncOnDestruct countGuard(_executedCount); auto moveOps = mover->createMoveOperations(std::move(keys)); - _master.execute(makeLambdaTask([this, mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable { - if (_stopped.load(std::memory_order_relaxed)) return; - completeMove(std::move(mover), std::move(moveOps), std::move(onDone)); + auto & master = job->_master; + master.execute(makeLambdaTask([job=std::move(job), mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable { + if (job->_stopped.load(std::memory_order_relaxed)) return; + job->completeMove(std::move(mover), std::move(moveOps), std::move(onDone)); })); } @@ -437,18 +419,10 @@ BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) internalNotifyDiskMemUsage(state); } -bool -BucketMoveJobV2::inSync() const { - return _executedCount == _startedCount; -} - void BucketMoveJobV2::onStop() { // Called by master write thread _stopped = true; - while ( ! inSync() ) { - std::this_thread::sleep_for(1ms); - } } void diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h index eba8fd9a62c..871bbcca15a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -7,6 +7,7 @@ #include "i_disk_mem_usage_listener.h" #include "ibucketstatechangedhandler.h" #include "iclusterstatechangedhandler.h" +#include "maintenancedocumentsubdb.h" #include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> @@ -61,8 +62,8 @@ private: IBucketModifiedHandler &_modifiedHandler; IThreadService &_master; BucketExecutor &_bucketExecutor; - const MaintenanceDocumentSubDB &_ready; - const MaintenanceDocumentSubDB &_notReady; + const MaintenanceDocumentSubDB _ready; + const MaintenanceDocumentSubDB _notReady; const document::BucketSpace _bucketSpace; size_t _iterateCount; Movers _movers; @@ -70,8 +71,6 @@ private: BucketMoveSet _buckets2Move; std::atomic<bool> _stopped; - std::atomic<size_t> _startedCount; - std::atomic<size_t> _executedCount; std::atomic<size_t> _bucketsPending; bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; @@ -80,10 +79,10 @@ private: IDiskMemUsageNotifier &_diskMemUsageNotifier; void startMove(BucketMoverSP mover, size_t maxDocsToMove); - void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context); + static void prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMoverSP mover, + std::vector<MoveKey> keysToMove, IDestructorCallbackSP context); void completeMove(BucketMoverSP mover, GuardedMoveOps moveOps, IDestructorCallbackSP context); bool checkIfMoverComplete(const BucketMover & mover); - void checkForReschedule(const bucketdb::Guard & guard, BucketId bucket); void considerBucket(const bucketdb::Guard & guard, BucketId bucket); void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket); void updatePending(); @@ -94,9 +93,9 @@ private: BucketMoverSP greedyCreateMover(); void backFillMovers(); void moveDocs(size_t maxDocsToMove); - void failOperation(BucketId bucket); + static void failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucket); void recompute(const bucketdb::Guard & guard); - friend class StartMove; + class StartMove; public: BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, IDocumentMoveHandler &moveHandler, @@ -118,7 +117,6 @@ public: bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket); bool done() const; void recompute(); // Only for testing - bool inSync() const; bool run() override; void notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) override; |