summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-04-10 13:43:22 +0200
committerGitHub <noreply@github.com>2021-04-10 13:43:22 +0200
commit8ac23b7631bf41a61daf703bcc2a1edeaf4ff58b (patch)
treeea59761dc07c51080136eb9e7d4a803bf8012057
parent2b54944bd62c757813b900c06cc10c20bddd7867 (diff)
parent4731819e1378098d56cda6d6d8dad522f737b241 (diff)
Merge pull request #17353 from vespa-engine/balder/avoid-blocking-during-stop
Use shared_ptr to for bucket move job to avoid blocking duirng reconfig.
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp293
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp198
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp60
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h16
4 files changed, 269 insertions, 298 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index 85f80293377..bb7180dadf1 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -72,7 +72,7 @@ struct ControllerFixtureBase : public ::testing::Test
MyFrozenBucketHandler _fbh;
BucketCreateNotifier _bucketCreateNotifier;
test::DiskMemUsageNotifier _diskMemUsageNotifier;
- BucketMoveJob _bmj;
+ std::shared_ptr<BucketMoveJob> _bmj;
MyCountJobRunner _runner;
ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts);
~ControllerFixtureBase();
@@ -97,7 +97,7 @@ struct ControllerFixtureBase : public ::testing::Test
}
ControllerFixtureBase &remFrozen(const BucketId &bucket) {
_fbh.remFrozen(bucket);
- _bmj.notifyThawedBucket(bucket);
+ _bmj->notifyThawedBucket(bucket);
return *this;
}
ControllerFixtureBase &activateBucket(const BucketId &bucket) {
@@ -120,7 +120,7 @@ struct ControllerFixtureBase : public ::testing::Test
return _calc->asked();
}
void runLoop() {
- while (!_bmj.isBlocked() && !_bmj.run()) {
+ while (!_bmj->isBlocked() && !_bmj->run()) {
}
}
};
@@ -137,11 +137,10 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig
_fbh(),
_bucketCreateNotifier(),
_diskMemUsageNotifier(),
- _bmj(_calc, _moveHandler, _modifiedHandler, _ready._subDb,
- _notReady._subDb, _fbh, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler,
- _diskMemUsageNotifier, blockableConfig,
- "test", makeBucketSpace()),
- _runner(_bmj)
+ _bmj(std::make_shared<BucketMoveJob>(_calc, _moveHandler, _modifiedHandler, _ready._subDb, _notReady._subDb,
+ _fbh, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler,
+ _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())),
+ _runner(*_bmj)
{
}
@@ -179,11 +178,11 @@ struct OnlyReadyControllerFixture : public ControllerFixtureBase
TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so)
{
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
- _bmj.scanAndMove(4, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj->done());
EXPECT_TRUE(docsMoved().empty());
EXPECT_TRUE(bucketsModified().empty());
}
@@ -194,8 +193,8 @@ TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_buc
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
addReady(_notReady.bucket(4));
- _bmj.scanAndMove(4, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]);
assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]);
@@ -208,8 +207,8 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc
{
// bucket 2 should be moved
addReady(_ready.bucket(1));
- _bmj.scanAndMove(4, 3);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(4, 3);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(2u, docsMoved().size());
assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]);
assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]);
@@ -225,14 +224,14 @@ TEST_F(ControllerFixture, require_that_maxBucketsToScan_is_taken_into_considerat
addReady(_notReady.bucket(4));
// buckets 1, 2, and 3 considered
- _bmj.scanAndMove(3, 3);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(3, 3);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// move bucket 4
- _bmj.scanAndMove(1, 4);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(1, 4);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]);
assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]);
@@ -249,14 +248,14 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
addReady(_notReady.bucket(4));
// consider move bucket 1
- _bmj.scanAndMove(1, 2);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 2);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// move bucket 2, docs 1,2
- _bmj.scanAndMove(1, 2);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 2);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(2u, docsMoved().size());
EXPECT_TRUE(assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]));
EXPECT_TRUE(assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]));
@@ -264,8 +263,8 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
// move bucket 3, docs 1,2
- _bmj.scanAndMove(1, 2);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 2);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(4u, docsMoved().size());
EXPECT_TRUE(assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[2]));
EXPECT_TRUE(assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[3]));
@@ -273,16 +272,16 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]);
// move bucket 4, docs 1,2
- _bmj.scanAndMove(1, 2);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 2);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(6u, docsMoved().size());
EXPECT_TRUE(assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[4]));
EXPECT_TRUE(assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[5]));
EXPECT_EQ(2u, bucketsModified().size());
// move bucket 4, docs 3
- _bmj.scanAndMove(1, 2);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(1, 2);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(7u, docsMoved().size());
EXPECT_TRUE(assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[6]));
EXPECT_EQ(3u, bucketsModified().size());
@@ -297,10 +296,10 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca
addReady(_ready.bucket(2));
// start with bucket2
- _bmj.scanAndMove(1, 0);
+ _bmj->scanAndMove(1, 0);
changeCalc();
- _bmj.scanAndMove(5, 0);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(5, 0);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_ready.bucket(2), calcAsked()[0]);
EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]);
@@ -309,10 +308,10 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca
// start with bucket3
changeCalc();
- _bmj.scanAndMove(2, 0);
+ _bmj->scanAndMove(2, 0);
changeCalc();
- _bmj.scanAndMove(5, 0);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(5, 0);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_notReady.bucket(3), calcAsked()[0]);
EXPECT_EQ(_notReady.bucket(4), calcAsked()[1]);
@@ -321,10 +320,10 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca
// start with bucket4
changeCalc();
- _bmj.scanAndMove(3, 0);
+ _bmj->scanAndMove(3, 0);
changeCalc();
- _bmj.scanAndMove(5, 0);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(5, 0);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_notReady.bucket(4), calcAsked()[0]);
EXPECT_EQ(_ready.bucket(1), calcAsked()[1]);
@@ -333,8 +332,8 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca
// start with bucket1
changeCalc();
- _bmj.scanAndMove(5, 0);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(5, 0);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
EXPECT_EQ(_ready.bucket(2), calcAsked()[1]);
@@ -343,16 +342,16 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca
// change calc in second pass
changeCalc();
- _bmj.scanAndMove(3, 0);
+ _bmj->scanAndMove(3, 0);
changeCalc();
- _bmj.scanAndMove(2, 0);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(2, 0);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(2u, calcAsked().size());
EXPECT_EQ(_notReady.bucket(4), calcAsked()[0]);
EXPECT_EQ(_ready.bucket(1), calcAsked()[1]);
changeCalc();
- _bmj.scanAndMove(5, 0);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(5, 0);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_ready.bucket(2), calcAsked()[0]);
EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]);
@@ -361,22 +360,22 @@ TEST_F(ControllerFixture, require_that_we_can_change_calculator_and_continue_sca
// check 1 bucket at a time, start with bucket2
changeCalc();
- _bmj.scanAndMove(1, 0);
+ _bmj->scanAndMove(1, 0);
changeCalc();
- _bmj.scanAndMove(1, 0);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 0);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, calcAsked().size());
EXPECT_EQ(_ready.bucket(2), calcAsked()[0]);
- _bmj.scanAndMove(1, 0);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 0);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(2u, calcAsked().size());
EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]);
- _bmj.scanAndMove(1, 0);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 0);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(3u, calcAsked().size());
EXPECT_EQ(_notReady.bucket(4), calcAsked()[2]);
- _bmj.scanAndMove(1, 0);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(1, 0);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_ready.bucket(1), calcAsked()[3]);
}
@@ -385,16 +384,16 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w
{
// bucket 1 should be moved
addReady(_ready.bucket(2));
- _bmj.scanAndMove(3, 1);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(3, 1);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(1u, calcAsked().size());
changeCalc(); // Not cancelled, bucket 1 still moving to notReady
EXPECT_EQ(1u, calcAsked().size());
EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
_calc->resetAsked();
- _bmj.scanAndMove(2, 1);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(2, 1);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, calcAsked().size());
addReady(_ready.bucket(1));
@@ -406,13 +405,13 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w
changeCalc(); // not cancelled. No active bucket move
EXPECT_EQ(0u, calcAsked().size());
_calc->resetAsked();
- _bmj.scanAndMove(2, 1);
+ _bmj->scanAndMove(2, 1);
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(2u, calcAsked().size());
EXPECT_EQ(_ready.bucket(2), calcAsked()[0]);
EXPECT_EQ(_notReady.bucket(3), calcAsked()[1]);
- _bmj.scanAndMove(2, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(2, 3);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_notReady.bucket(4), calcAsked()[2]);
@@ -425,12 +424,12 @@ TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_don
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
addReady(_notReady.bucket(4));
- _bmj.scanAndMove(4, 1);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(4, 1);
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(4u, calcAsked().size());
- _bmj.scanAndMove(0, 2);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(0, 2);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(4u, calcAsked().size());
}
@@ -440,13 +439,13 @@ TEST_F(ControllerFixture, require_that_frozen_bucket_is_not_moved_until_thawed)
// bucket 1 should be moved but is frozen
addReady(_ready.bucket(2));
addFrozen(_ready.bucket(1));
- _bmj.scanAndMove(4, 3); // scan all, delay frozen bucket 1
+ _bmj->scanAndMove(4, 3); // scan all, delay frozen bucket 1
remFrozen(_ready.bucket(1));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
- _bmj.scanAndMove(0, 3); // move delayed and thawed bucket 1
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(0, 3); // move delayed and thawed bucket 1
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]);
@@ -460,19 +459,19 @@ TEST_F(ControllerFixture, require_that_thawed_bucket_is_moved_before_other_bucke
addReady(_notReady.bucket(3));
addReady(_notReady.bucket(4));
addFrozen(_ready.bucket(2));
- _bmj.scanAndMove(3, 2); // delay bucket 2, move bucket 3
+ _bmj->scanAndMove(3, 2); // delay bucket 2, move bucket 3
remFrozen(_ready.bucket(2));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(2u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]);
- _bmj.scanAndMove(2, 2); // move thawed bucket 2
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(2, 2); // move thawed bucket 2
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(4u, docsMoved().size());
EXPECT_EQ(2u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(2), bucketsModified()[1]);
- _bmj.scanAndMove(1, 4); // move bucket 4
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(1, 4); // move bucket 4
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(7u, docsMoved().size());
EXPECT_EQ(3u, bucketsModified().size());
EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]);
@@ -483,31 +482,31 @@ TEST_F(ControllerFixture, require_that_re_frozen_thawed_bucket_is_not_moved_unti
// bucket 1 should be moved but is re-frozen
addReady(_ready.bucket(2));
addFrozen(_ready.bucket(1));
- _bmj.scanAndMove(1, 0); // scan, delay frozen bucket 1
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 0); // scan, delay frozen bucket 1
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(1u, calcAsked().size());
EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
remFrozen(_ready.bucket(1));
addFrozen(_ready.bucket(1));
- _bmj.scanAndMove(1, 0); // scan, but nothing to move
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 0); // scan, but nothing to move
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(3u, calcAsked().size());
EXPECT_EQ(_ready.bucket(1), calcAsked()[1]);
EXPECT_EQ(_ready.bucket(2), calcAsked()[2]);
remFrozen(_ready.bucket(1));
- _bmj.scanAndMove(3, 4); // move delayed and thawed bucket 1
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(3, 4); // move delayed and thawed bucket 1
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]);
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_ready.bucket(1), calcAsked()[3]);
- _bmj.scanAndMove(2, 0); // scan the rest
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(2, 0); // scan the rest
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(6u, calcAsked().size());
@@ -520,15 +519,15 @@ TEST_F(ControllerFixture, require_that_thawed_bucket_is_not_moved_if_new_calcula
addReady(_ready.bucket(2));
addReady(_notReady.bucket(3));
addFrozen(_notReady.bucket(3));
- _bmj.scanAndMove(4, 3); // scan all, delay frozen bucket 3
+ _bmj->scanAndMove(4, 3); // scan all, delay frozen bucket 3
remFrozen(_notReady.bucket(3));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(4u, calcAsked().size());
changeCalc();
remReady(_notReady.bucket(3));
- _bmj.scanAndMove(0, 3); // consider delayed bucket 3
+ _bmj->scanAndMove(0, 3); // consider delayed bucket 3
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(1u, calcAsked().size());
@@ -541,8 +540,8 @@ TEST_F(ControllerFixture, require_that_current_bucket_mover_is_cancelled_if_buck
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
addReady(_notReady.bucket(3));
- _bmj.scanAndMove(3, 1); // move 1 doc from bucket 3
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(3, 1); // move 1 doc from bucket 3
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(3u, calcAsked().size());
@@ -551,22 +550,22 @@ TEST_F(ControllerFixture, require_that_current_bucket_mover_is_cancelled_if_buck
EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]);
addFrozen(_notReady.bucket(3));
- _bmj.scanAndMove(1, 3); // done scanning
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 3); // done scanning
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(3u, calcAsked().size());
- _bmj.scanAndMove(1, 3); // done scanning
+ _bmj->scanAndMove(1, 3); // done scanning
remFrozen(_notReady.bucket(3));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]);
- _bmj.scanAndMove(0, 2); // move all docs from bucket 3 again
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(0, 2); // move all docs from bucket 3 again
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]);
@@ -581,14 +580,14 @@ TEST_F(ControllerFixture, require_that_current_bucket_mover_is_not_cancelled_if_
addReady(_ready.bucket(2));
addReady(_notReady.bucket(3));
addReady(_notReady.bucket(4));
- _bmj.scanAndMove(3, 1); // move 1 doc from bucket 3
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(3, 1); // move 1 doc from bucket 3
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(3u, calcAsked().size());
addFrozen(_notReady.bucket(4));
- _bmj.scanAndMove(1, 2); // move rest of docs from bucket 3
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 2); // move rest of docs from bucket 3
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(2u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]);
@@ -600,15 +599,15 @@ TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_
// bucket 1 should be moved but is active
addReady(_ready.bucket(2));
activateBucket(_ready.bucket(1));
- _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
deactivateBucket(_ready.bucket(1));
- EXPECT_FALSE(_bmj.done());
- _bmj.scanAndMove(0, 3); // move delayed and de-activated bucket 1
- EXPECT_TRUE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
+ _bmj->scanAndMove(0, 3); // move delayed and de-activated bucket 1
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]);
@@ -619,21 +618,21 @@ TEST_F(OnlyReadyControllerFixture, require_that_de_activated_bucket_is_moved_bef
// bucket 1, 2, 3 should be moved (but bucket 1 is active)
addReady(_ready.bucket(4));
activateBucket(_ready.bucket(1));
- _bmj.scanAndMove(2, 4); // delay bucket 1, move bucket 2
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(2, 4); // delay bucket 1, move bucket 2
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(2u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
deactivateBucket(_ready.bucket(1));
- _bmj.scanAndMove(2, 4); // move de-activated bucket 1
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(2, 4); // move de-activated bucket 1
+ EXPECT_FALSE(_bmj->done());
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(2u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(1), bucketsModified()[1]);
- _bmj.scanAndMove(2, 4); // move bucket 3
- // EXPECT_TRUE(_bmj.done()); // TODO(geirst): fix this
+ _bmj->scanAndMove(2, 4); // move bucket 3
+ // EXPECT_TRUE(_bmj->done()); // TODO(geirst): fix this
EXPECT_EQ(6u, docsMoved().size());
EXPECT_EQ(3u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(3), bucketsModified()[2]);
@@ -644,14 +643,14 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_c
// bucket 1 should be moved
addReady(_ready.bucket(2));
activateBucket(_ready.bucket(1));
- _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1
+ _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
deactivateBucket(_ready.bucket(1));
addReady(_ready.bucket(1));
changeCalc();
- _bmj.scanAndMove(0, 3); // consider delayed bucket 3
+ _bmj->scanAndMove(0, 3); // consider delayed bucket 3
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
EXPECT_EQ(1u, calcAsked().size());
@@ -663,18 +662,18 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_froze
// bucket 1 should be moved
addReady(_ready.bucket(2));
activateBucket(_ready.bucket(1));
- _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1
+ _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
addFrozen(_ready.bucket(1));
deactivateBucket(_ready.bucket(1));
- _bmj.scanAndMove(0, 3); // bucket 1 de-activated but frozen
+ _bmj->scanAndMove(0, 3); // bucket 1 de-activated but frozen
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
remFrozen(_ready.bucket(1));
- _bmj.scanAndMove(0, 3); // handle thawed bucket 1
+ _bmj->scanAndMove(0, 3); // handle thawed bucket 1
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]);
@@ -685,18 +684,18 @@ TEST_F(ControllerFixture, require_that_thawed_bucket_is_not_moved_if_active_as_w
// bucket 1 should be moved
addReady(_ready.bucket(2));
addFrozen(_ready.bucket(1));
- _bmj.scanAndMove(4, 3); // scan all, delay frozen bucket 1
+ _bmj->scanAndMove(4, 3); // scan all, delay frozen bucket 1
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
activateBucket(_ready.bucket(1));
remFrozen(_ready.bucket(1));
- _bmj.scanAndMove(0, 3); // bucket 1 thawed but active
+ _bmj->scanAndMove(0, 3); // bucket 1 thawed but active
EXPECT_EQ(0u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
deactivateBucket(_ready.bucket(1));
- _bmj.scanAndMove(0, 3); // handle de-activated bucket 1
+ _bmj->scanAndMove(0, 3); // handle de-activated bucket 1
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]);
@@ -707,8 +706,8 @@ TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_
_calc->setNodeRetired(true);
// Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired.
addReady(_ready.bucket(1));
- _bmj.scanAndMove(4, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
}
@@ -720,8 +719,8 @@ TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_i
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
addReady(_notReady.bucket(3));
- _bmj.scanAndMove(4, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(0u, docsMoved().size());
}
@@ -732,8 +731,8 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea
addReady(_ready.bucket(2));
addReady(_notReady.bucket(3));
activateBucket(_notReady.bucket(3));
- _bmj.scanAndMove(4, 3);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(4, 3);
+ EXPECT_FALSE(_bmj->done());
ASSERT_EQ(2u, docsMoved().size());
assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]);
assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]);
@@ -743,19 +742,19 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea
TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job)
{
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
runLoop();
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->done());
EXPECT_TRUE(docsMoved().empty());
EXPECT_TRUE(bucketsModified().empty());
addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify
- EXPECT_TRUE(_bmj.done()); // move job still believes work done
- _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3
- EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj->done()); // move job still believes work done
+ _bmj->notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3
+ EXPECT_FALSE(_bmj->done());
runLoop();
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->done());
EXPECT_EQ(1u, bucketsModified().size());
EXPECT_EQ(2u, docsMoved().size());
}
@@ -769,18 +768,18 @@ struct ResourceLimitControllerFixture : public ControllerFixture
void testJobStopping(DiskMemUsageState blockingUsageState) {
// Bucket 1 should be moved
addReady(_ready.bucket(2));
- // Note: This depends on _bmj.run() moving max 1 documents
- EXPECT_TRUE(!_bmj.run());
+ // Note: This depends on _bmj->run() moving max 1 documents
+ EXPECT_TRUE(!_bmj->run());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// Notify that we've over limit
_diskMemUsageNotifier.notify(blockingUsageState);
- EXPECT_TRUE(_bmj.run());
+ EXPECT_TRUE(_bmj->run());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// Notify that we've under limit
_diskMemUsageNotifier.notify(DiskMemUsageState());
- EXPECT_TRUE(!_bmj.run());
+ EXPECT_TRUE(!_bmj->run());
EXPECT_EQ(2u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
}
@@ -788,13 +787,13 @@ struct ResourceLimitControllerFixture : public ControllerFixture
void testJobNotStopping(DiskMemUsageState blockingUsageState) {
// Bucket 1 should be moved
addReady(_ready.bucket(2));
- // Note: This depends on _bmj.run() moving max 1 documents
- EXPECT_TRUE(!_bmj.run());
+ // Note: This depends on _bmj->run() moving max 1 documents
+ EXPECT_TRUE(!_bmj->run());
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// Notify that we've over limit, but not over adjusted limit
_diskMemUsageNotifier.notify(blockingUsageState);
- EXPECT_TRUE(!_bmj.run());
+ EXPECT_TRUE(!_bmj->run());
EXPECT_EQ(2u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
}
@@ -834,20 +833,20 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixture
}
void assertRunToBlocked() {
- EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached
- EXPECT_FALSE(_bmj.done());
- EXPECT_TRUE(_bmj.isBlocked());
- EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS));
+ EXPECT_TRUE(_bmj->run()); // job becomes blocked as max outstanding limit is reached
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_TRUE(_bmj->isBlocked());
+ EXPECT_TRUE(_bmj->isBlocked(BlockedReason::OUTSTANDING_OPS));
}
void assertRunToNotBlocked() {
- EXPECT_FALSE(_bmj.run());
- EXPECT_FALSE(_bmj.done());
- EXPECT_FALSE(_bmj.isBlocked());
+ EXPECT_FALSE(_bmj->run());
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_FALSE(_bmj->isBlocked());
}
void assertRunToFinished() {
- EXPECT_TRUE(_bmj.run());
- EXPECT_TRUE(_bmj.done());
- EXPECT_FALSE(_bmj.isBlocked());
+ EXPECT_TRUE(_bmj->run());
+ EXPECT_TRUE(_bmj->done());
+ EXPECT_FALSE(_bmj->isBlocked());
}
void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) {
EXPECT_EQ(expDocsMovedCnt, docsMoved().size());
@@ -856,7 +855,7 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixture
void unblockJob(uint32_t expRunnerCnt) {
_moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner
EXPECT_EQ(expRunnerCnt, _runner.runCount);
- EXPECT_FALSE(_bmj.isBlocked());
+ EXPECT_FALSE(_bmj->isBlocked());
}
};
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
index 5223e20bd5c..866c3c45e02 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
@@ -42,7 +42,7 @@ struct ControllerFixtureBase : public ::testing::Test
DummyBucketExecutor _bucketExecutor;
MyMoveHandler _moveHandler;
DocumentDBTaggedMetrics _metrics;
- BucketMoveJobV2 _bmj;
+ std::shared_ptr<BucketMoveJobV2> _bmj;
MyCountJobRunner _runner;
ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts);
~ControllerFixtureBase();
@@ -89,11 +89,11 @@ struct ControllerFixtureBase : public ::testing::Test
return _calc->asked();
}
size_t numPending() {
- _bmj.updateMetrics(_metrics);
+ _bmj->updateMetrics(_metrics);
return _metrics.bucketMove.bucketsPending.getLast();
}
void runLoop() {
- while (!_bmj.isBlocked() && !_bmj.run()) {
+ while (!_bmj->isBlocked() && !_bmj->run()) {
}
}
void sync() {
@@ -123,11 +123,11 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig
_bucketExecutor(4),
_moveHandler(*_bucketDB, storeMoveDoneContexts),
_metrics("test", 1),
- _bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
- _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler,
- _diskMemUsageNotifier, blockableConfig,
- "test", makeBucketSpace()),
- _runner(_bmj)
+ _bmj(std::make_shared<BucketMoveJobV2>(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor,
+ _ready._subDb, _notReady._subDb, _bucketCreateNotifier,
+ _clusterStateHandler, _bucketHandler, _diskMemUsageNotifier,
+ blockableConfig, "test", makeBucketSpace())),
+ _runner(*_bmj)
{
}
@@ -165,13 +165,13 @@ struct OnlyReadyControllerFixture : public ControllerFixtureBase
TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so)
{
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->done());
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
- _bmj.recompute();
+ _bmj->recompute();
masterExecute([this]() {
- EXPECT_TRUE(_bmj.scanAndMove(4, 3));
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj->done());
});
EXPECT_TRUE(docsMoved().empty());
EXPECT_TRUE(bucketsModified().empty());
@@ -185,12 +185,12 @@ TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_buc
addReady(_notReady.bucket(4));
EXPECT_EQ(0, numPending());
- _bmj.recompute();
+ _bmj->recompute();
EXPECT_EQ(1, numPending());
masterExecute([this]() {
- EXPECT_FALSE(_bmj.done());
- EXPECT_TRUE(_bmj.scanAndMove(4, 3));
- EXPECT_TRUE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_TRUE(_bmj->scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(0, numPending());
@@ -206,11 +206,11 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc
{
// bucket 2 should be moved
addReady(_ready.bucket(1));
- _bmj.recompute();
+ _bmj->recompute();
masterExecute([this]() {
- EXPECT_FALSE(_bmj.done());
- EXPECT_TRUE(_bmj.scanAndMove(4, 3));
- EXPECT_TRUE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_TRUE(_bmj->scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(2u, docsMoved().size());
@@ -224,19 +224,19 @@ TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error)
{
// bucket 2 should be moved
addReady(_ready.bucket(1));
- _bmj.recompute();
+ _bmj->recompute();
failRetrieveForLid(5);
masterExecute([this]() {
- EXPECT_FALSE(_bmj.done());
- EXPECT_TRUE(_bmj.scanAndMove(4, 3));
- EXPECT_TRUE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_TRUE(_bmj->scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj->done());
});
sync();
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
fixRetriever();
masterExecute([this]() {
- EXPECT_TRUE(_bmj.scanAndMove(4, 3));
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(2u, docsMoved().size());
@@ -254,29 +254,29 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
addReady(_notReady.bucket(3));
addReady(_notReady.bucket(4));
- _bmj.recompute();
+ _bmj->recompute();
EXPECT_EQ(3, numPending());
masterExecute([this]() {
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
- EXPECT_FALSE(_bmj.scanAndMove(1, 2));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj->done());
});
sync();
EXPECT_EQ(2, numPending());
EXPECT_EQ(2u, docsMoved().size());
masterExecute([this]() {
- EXPECT_FALSE(_bmj.scanAndMove(1, 2));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj->done());
});
sync();
EXPECT_EQ(2, numPending());
EXPECT_EQ(4u, docsMoved().size());
masterExecute([this]() {
- EXPECT_FALSE(_bmj.scanAndMove(1, 2));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj->done());
});
sync();
EXPECT_EQ(1, numPending());
@@ -284,8 +284,8 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
// move bucket 4, docs 3
masterExecute([this]() {
- EXPECT_TRUE(_bmj.scanAndMove(1, 2));
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->scanAndMove(1, 2));
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(0, numPending());
@@ -302,19 +302,19 @@ TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_don
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
addReady(_notReady.bucket(4));
- _bmj.recompute();
+ _bmj->recompute();
masterExecute([this]() {
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
- EXPECT_FALSE(_bmj.scanAndMove(1, 1));
- EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj->scanAndMove(1, 1));
+ EXPECT_FALSE(_bmj->done());
});
sync();
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(4u, calcAsked().size());
masterExecute([this]() {
- EXPECT_TRUE(_bmj.scanAndMove(1, 2));
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->scanAndMove(1, 2));
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(3u, docsMoved().size());
@@ -326,12 +326,12 @@ TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_
{
// bucket 1 should be moved but is active
addReady(_ready.bucket(2));
- _bmj.recompute();
- EXPECT_FALSE(_bmj.done());
+ _bmj->recompute();
+ EXPECT_FALSE(_bmj->done());
activateBucket(_ready.bucket(1));
masterExecute([this]() {
- EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // scan all, delay active bucket 1
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // scan all, delay active bucket 1
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(0u, docsMoved().size());
@@ -339,9 +339,9 @@ TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_
deactivateBucket(_ready.bucket(1));
masterExecute([this]() {
- EXPECT_FALSE(_bmj.done());
- EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // move delayed and de-activated bucket 1
- EXPECT_TRUE(_bmj.done());
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // move delayed and de-activated bucket 1
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(3u, docsMoved().size());
@@ -355,9 +355,9 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w
addReady(_ready.bucket(2));
masterExecute([this]() {
- _bmj.recompute();
- _bmj.scanAndMove(1, 1);
- EXPECT_FALSE(_bmj.done());
+ _bmj->recompute();
+ _bmj->scanAndMove(1, 1);
+ EXPECT_FALSE(_bmj->done());
});
sync();
EXPECT_EQ(1u, docsMoved().size());
@@ -367,8 +367,8 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w
EXPECT_EQ(4u, calcAsked().size());
EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
_calc->resetAsked();
- _bmj.scanAndMove(1, 1);
- EXPECT_FALSE(_bmj.done());
+ _bmj->scanAndMove(1, 1);
+ EXPECT_FALSE(_bmj->done());
});
sync();
EXPECT_EQ(1u, docsMoved().size());
@@ -383,7 +383,7 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w
_calc->resetAsked();
changeCalc(); // not cancelled. No active bucket move
EXPECT_EQ(4u, calcAsked().size());
- _bmj.scanAndMove(1, 1);
+ _bmj->scanAndMove(1, 1);
});
sync();
EXPECT_EQ(1u, docsMoved().size());
@@ -391,9 +391,9 @@ TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_w
EXPECT_EQ(_ready.bucket(2), calcAsked()[1]);
EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]);
masterExecute([this]() {
- _bmj.scanAndMove(2, 3);
+ _bmj->scanAndMove(2, 3);
});
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->done());
sync();
EXPECT_EQ(3u, docsMoved().size());
EXPECT_EQ(4u, calcAsked().size());
@@ -405,10 +405,10 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_c
{
// bucket 1 should be moved
addReady(_ready.bucket(2));
- _bmj.recompute();
+ _bmj->recompute();
masterExecute([this]() {
activateBucket(_ready.bucket(1));
- _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1
+ _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1
});
sync();
EXPECT_EQ(0u, docsMoved().size());
@@ -418,7 +418,7 @@ TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_c
deactivateBucket(_ready.bucket(1));
addReady(_ready.bucket(1));
changeCalc();
- _bmj.scanAndMove(4, 3); // consider delayed bucket 3
+ _bmj->scanAndMove(4, 3); // consider delayed bucket 3
});
sync();
EXPECT_EQ(0u, docsMoved().size());
@@ -433,9 +433,9 @@ TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_
// Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired.
addReady(_ready.bucket(1));
masterExecute([this]() {
- _bmj.recompute();
- _bmj.scanAndMove(4, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->recompute();
+ _bmj->scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(0u, docsMoved().size());
@@ -450,9 +450,9 @@ TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_i
addReady(_ready.bucket(2));
addReady(_notReady.bucket(3));
masterExecute([this]() {
- _bmj.recompute();
- _bmj.scanAndMove(4, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->recompute();
+ _bmj->scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj->done());
});
sync();
EXPECT_EQ(0u, docsMoved().size());
@@ -464,11 +464,11 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
addReady(_notReady.bucket(3));
- _bmj.recompute();
+ _bmj->recompute();
masterExecute([this]() {
activateBucket(_notReady.bucket(3));
- _bmj.scanAndMove(4, 3);
- EXPECT_TRUE(_bmj.done());
+ _bmj->scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj->done());
});
sync();
ASSERT_EQ(2u, docsMoved().size());
@@ -481,27 +481,27 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea
TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job)
{
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->done());
addReady(_ready.bucket(1));
addReady(_ready.bucket(2));
runLoop();
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->done());
sync();
EXPECT_TRUE(docsMoved().empty());
EXPECT_TRUE(bucketsModified().empty());
addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify
- EXPECT_TRUE(_bmj.done()); // move job still believes work done
+ EXPECT_TRUE(_bmj->done()); // move job still believes work done
sync();
EXPECT_TRUE(bucketsModified().empty());
masterExecute([this]() {
- _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3
- EXPECT_FALSE(_bmj.done());
+ _bmj->notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3
+ EXPECT_FALSE(_bmj->done());
EXPECT_TRUE(bucketsModified().empty());
});
sync();
EXPECT_TRUE(bucketsModified().empty());
runLoop();
- EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(_bmj->done());
sync();
EXPECT_EQ(1u, bucketsModified().size());
@@ -518,22 +518,22 @@ struct ResourceLimitControllerFixture : public ControllerFixture
void testJobStopping(DiskMemUsageState blockingUsageState) {
// Bucket 1 should be moved
addReady(_ready.bucket(2));
- _bmj.recompute();
- EXPECT_FALSE(_bmj.done());
- // Note: This depends on _bmj.run() moving max 1 documents
- EXPECT_FALSE(_bmj.run());
+ _bmj->recompute();
+ EXPECT_FALSE(_bmj->done());
+ // Note: This depends on _bmj->run() moving max 1 documents
+ EXPECT_FALSE(_bmj->run());
sync();
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// Notify that we've over limit
_diskMemUsageNotifier.notify(blockingUsageState);
- EXPECT_TRUE(_bmj.run());
+ EXPECT_TRUE(_bmj->run());
sync();
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// Notify that we've under limit
_diskMemUsageNotifier.notify(DiskMemUsageState());
- EXPECT_FALSE(_bmj.run());
+ EXPECT_FALSE(_bmj->run());
sync();
EXPECT_EQ(2u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
@@ -542,16 +542,16 @@ struct ResourceLimitControllerFixture : public ControllerFixture
void testJobNotStopping(DiskMemUsageState blockingUsageState) {
// Bucket 1 should be moved
addReady(_ready.bucket(2));
- _bmj.recompute();
- EXPECT_FALSE(_bmj.done());
- // Note: This depends on _bmj.run() moving max 1 documents
- EXPECT_FALSE(_bmj.run());
+ _bmj->recompute();
+ EXPECT_FALSE(_bmj->done());
+ // Note: This depends on _bmj->run() moving max 1 documents
+ EXPECT_FALSE(_bmj->run());
sync();
EXPECT_EQ(1u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
// Notify that we've over limit, but not over adjusted limit
_diskMemUsageNotifier.notify(blockingUsageState);
- EXPECT_FALSE(_bmj.run());
+ EXPECT_FALSE(_bmj->run());
sync();
EXPECT_EQ(2u, docsMoved().size());
EXPECT_EQ(0u, bucketsModified().size());
@@ -600,24 +600,24 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase
_builder.createDocs(14, 4, 5);
_notReady.insertDocs(_builder.getDocs());
addReady(_ready.bucket(3));
- _bmj.recompute();
+ _bmj->recompute();
}
void assertRunToBlocked() {
- EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached
- EXPECT_FALSE(_bmj.done());
- EXPECT_TRUE(_bmj.isBlocked());
- EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS));
+ EXPECT_TRUE(_bmj->run()); // job becomes blocked as max outstanding limit is reached
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_TRUE(_bmj->isBlocked());
+ EXPECT_TRUE(_bmj->isBlocked(BlockedReason::OUTSTANDING_OPS));
}
void assertRunToNotBlocked() {
- EXPECT_FALSE(_bmj.run());
- EXPECT_FALSE(_bmj.done());
- EXPECT_FALSE(_bmj.isBlocked());
+ EXPECT_FALSE(_bmj->run());
+ EXPECT_FALSE(_bmj->done());
+ EXPECT_FALSE(_bmj->isBlocked());
}
void assertRunToFinished() {
- EXPECT_TRUE(_bmj.run());
- EXPECT_TRUE(_bmj.done());
- EXPECT_FALSE(_bmj.isBlocked());
+ EXPECT_TRUE(_bmj->run());
+ EXPECT_TRUE(_bmj->done());
+ EXPECT_FALSE(_bmj->isBlocked());
}
void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) {
EXPECT_EQ(expDocsMovedCnt, docsMoved().size());
@@ -626,7 +626,7 @@ struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase
void unblockJob(uint32_t expRunnerCnt) {
_moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner
EXPECT_EQ(expRunnerCnt, _runner.runCount);
- EXPECT_FALSE(_bmj.isBlocked());
+ EXPECT_FALSE(_bmj->isBlocked());
}
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index fc2eeaab661..f0847ff051e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -4,7 +4,6 @@
#include "imaintenancejobrunner.h"
#include "ibucketstatechangednotifier.h"
#include "iclusterstatechangednotifier.h"
-#include "maintenancedocumentsubdb.h"
#include "i_disk_mem_usage_notifier.h"
#include "ibucketmodifiedhandler.h"
#include "move_operation_limiter.h"
@@ -87,8 +86,6 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
_bucketsInFlight(),
_buckets2Move(),
_stopped(false),
- _startedCount(0),
- _executedCount(0),
_bucketsPending(0),
_bucketCreateNotifier(bucketCreateNotifier),
_clusterStateChangedNotifier(clusterStateChangedNotifier),
@@ -151,24 +148,10 @@ BucketMoveJobV2::needMove(const ScanIterator &itr) const {
return {true, wantReady};
}
-namespace {
-
-class IncOnDestruct {
-public:
- IncOnDestruct(std::atomic<size_t> & count) : _count(count) {}
- ~IncOnDestruct() {
- _count.fetch_add(1, std::memory_order_relaxed);
- }
-private:
- std::atomic<size_t> & _count;
-};
-
-}
-
-class StartMove : public storage::spi::BucketTask {
+class BucketMoveJobV2::StartMove : public storage::spi::BucketTask {
public:
using IDestructorCallbackSP = std::shared_ptr<vespalib::IDestructorCallback>;
- StartMove(BucketMoveJobV2 & job, std::shared_ptr<BucketMover> mover,
+ StartMove(std::shared_ptr<BucketMoveJobV2> job, std::shared_ptr<BucketMover> mover,
std::vector<BucketMover::MoveKey> keys,
IDestructorCallbackSP opsTracker)
: _job(job),
@@ -180,27 +163,27 @@ public:
void run(const Bucket &bucket, IDestructorCallbackSP onDone) override {
assert(_mover->getBucket() == bucket.getBucketId());
using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>;
- _job.prepareMove(std::move(_mover), std::move(_keys),
- std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
+ BucketMoveJobV2::prepareMove(std::move(_job), std::move(_mover), std::move(_keys),
+ std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
}
void fail(const Bucket &bucket) override {
- _job.failOperation(bucket.getBucketId());
+ BucketMoveJobV2::failOperation(std::move(_job), bucket.getBucketId());
}
private:
- BucketMoveJobV2 & _job;
+ std::shared_ptr<BucketMoveJobV2> _job;
std::shared_ptr<BucketMover> _mover;
std::vector<BucketMover::MoveKey> _keys;
IDestructorCallbackSP _opsTracker;
};
void
-BucketMoveJobV2::failOperation(BucketId bucketId) {
- IncOnDestruct countGuard(_executedCount);
- _master.execute(makeLambdaTask([this, bucketId]() {
- if (_stopped.load(std::memory_order_relaxed)) return;
- considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
+BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucketId) {
+ auto & master = job->_master;
+ master.execute(makeLambdaTask([job=std::move(job), bucketId]() {
+ if (job->_stopped.load(std::memory_order_relaxed)) return;
+ job->considerBucket(job->_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
}));
}
@@ -213,19 +196,18 @@ BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
if (keys.empty()) return;
mover->updateLastValidGid(keys.back()._gid);
Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket()));
- auto bucketTask = std::make_unique<StartMove>(*this, std::move(mover), std::move(keys), getLimiter().beginOperation());
- _startedCount.fetch_add(1, std::memory_order_relaxed);
+ auto bucketTask = std::make_unique<StartMove>(shared_from_this(), std::move(mover), std::move(keys), getLimiter().beginOperation());
_bucketExecutor.execute(spiBucket, std::move(bucketTask));
}
void
-BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone)
+BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone)
{
- IncOnDestruct countGuard(_executedCount);
auto moveOps = mover->createMoveOperations(std::move(keys));
- _master.execute(makeLambdaTask([this, mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
- if (_stopped.load(std::memory_order_relaxed)) return;
- completeMove(std::move(mover), std::move(moveOps), std::move(onDone));
+ auto & master = job->_master;
+ master.execute(makeLambdaTask([job=std::move(job), mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
+ if (job->_stopped.load(std::memory_order_relaxed)) return;
+ job->completeMove(std::move(mover), std::move(moveOps), std::move(onDone));
}));
}
@@ -437,18 +419,10 @@ BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state)
internalNotifyDiskMemUsage(state);
}
-bool
-BucketMoveJobV2::inSync() const {
- return _executedCount == _startedCount;
-}
-
void
BucketMoveJobV2::onStop() {
// Called by master write thread
_stopped = true;
- while ( ! inSync() ) {
- std::this_thread::sleep_for(1ms);
- }
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index eba8fd9a62c..871bbcca15a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -7,6 +7,7 @@
#include "i_disk_mem_usage_listener.h"
#include "ibucketstatechangedhandler.h"
#include "iclusterstatechangedhandler.h"
+#include "maintenancedocumentsubdb.h"
#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h>
#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
@@ -61,8 +62,8 @@ private:
IBucketModifiedHandler &_modifiedHandler;
IThreadService &_master;
BucketExecutor &_bucketExecutor;
- const MaintenanceDocumentSubDB &_ready;
- const MaintenanceDocumentSubDB &_notReady;
+ const MaintenanceDocumentSubDB _ready;
+ const MaintenanceDocumentSubDB _notReady;
const document::BucketSpace _bucketSpace;
size_t _iterateCount;
Movers _movers;
@@ -70,8 +71,6 @@ private:
BucketMoveSet _buckets2Move;
std::atomic<bool> _stopped;
- std::atomic<size_t> _startedCount;
- std::atomic<size_t> _executedCount;
std::atomic<size_t> _bucketsPending;
bucketdb::IBucketCreateNotifier &_bucketCreateNotifier;
@@ -80,10 +79,10 @@ private:
IDiskMemUsageNotifier &_diskMemUsageNotifier;
void startMove(BucketMoverSP mover, size_t maxDocsToMove);
- void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context);
+ static void prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMoverSP mover,
+ std::vector<MoveKey> keysToMove, IDestructorCallbackSP context);
void completeMove(BucketMoverSP mover, GuardedMoveOps moveOps, IDestructorCallbackSP context);
bool checkIfMoverComplete(const BucketMover & mover);
- void checkForReschedule(const bucketdb::Guard & guard, BucketId bucket);
void considerBucket(const bucketdb::Guard & guard, BucketId bucket);
void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket);
void updatePending();
@@ -94,9 +93,9 @@ private:
BucketMoverSP greedyCreateMover();
void backFillMovers();
void moveDocs(size_t maxDocsToMove);
- void failOperation(BucketId bucket);
+ static void failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucket);
void recompute(const bucketdb::Guard & guard);
- friend class StartMove;
+ class StartMove;
public:
BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
IDocumentMoveHandler &moveHandler,
@@ -118,7 +117,6 @@ public:
bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket);
bool done() const;
void recompute(); // Only for testing
- bool inSync() const;
bool run() override;
void notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) override;