summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-02-03 09:24:58 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-02-17 11:07:06 +0000
commitaa583fb4e59d126715b80d3ae5bd7cb483bfe623 (patch)
treec3cfe788549dbd7fccf00a73eaa5d66376ce4a8c /searchcore
parent2e55f8118174a1e6fe5faa5ca9daf88f4be82461 (diff)
Add alternative bucket move job without frozen concept.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt12
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h2
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp578
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp368
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h119
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp116
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h76
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp52
12 files changed, 1239 insertions, 100 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt
index 1aa0b1c585d..d11da09b737 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt
@@ -17,6 +17,18 @@ vespa_add_executable(searchcore_documentbucketmover_test_app TEST
)
vespa_add_test(NAME searchcore_documentbucketmover_test_app COMMAND searchcore_documentbucketmover_test_app)
+vespa_add_executable(searchcore_documentbucketmover_v2_test_app TEST
+ SOURCES
+ documentbucketmover_v2_test.cpp
+ DEPENDS
+ searchcore_bucketmover_test
+ searchcore_test
+ searchcore_server
+ searchcore_feedoperation
+ GTest::GTest
+)
+vespa_add_test(NAME searchcore_documentbucketmover_v2_test_app COMMAND searchcore_documentbucketmover_v2_test_app)
+
vespa_add_executable(searchcore_scaniterator_test_app TEST
SOURCES
scaniterator_test.cpp
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
index 65e206d7327..e2cd5e268d7 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h
@@ -82,7 +82,7 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest {
struct MyBucketModifiedHandler : public IBucketModifiedHandler {
using BucketId = document::BucketId;
- BucketId::List _modified;
+ std::vector<BucketId> _modified;
void notifyBucketModified(const BucketId &bucket) override;
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index 502639d1dca..5556ed0d475 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -112,7 +112,7 @@ struct ControllerFixtureBase : public ::testing::Test
const MoveOperationVector &docsMoved() const {
return _moveHandler._moves;
}
- const BucketId::List &bucketsModified() const {
+ const std::vector<BucketId> &bucketsModified() const {
return _modifiedHandler._modified;
}
const BucketId::List &calcAsked() const {
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
new file mode 100644
index 00000000000..e7dc9b9e873
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
@@ -0,0 +1,578 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucketmover_common.h"
+#include <vespa/searchcore/proton/server/bucketmovejobv2.h>
+#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP("document_bucket_mover_test");
+
+using namespace proton;
+using namespace proton::move::test;
+using document::BucketId;
+using document::test::makeBucketSpace;
+using proton::bucketdb::BucketCreateNotifier;
+using storage::spi::BucketInfo;
+using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
+using MoveOperationVector = std::vector<MoveOperation>;
+using storage::spi::dummy::DummyBucketExecutor;
+using vespalib::ThreadStackExecutor;
+
+struct ControllerFixtureBase : public ::testing::Test
+{
+ test::UserDocumentsBuilder _builder;
+ test::BucketStateCalculator::SP _calc;
+ test::ClusterStateHandler _clusterStateHandler;
+ test::BucketHandler _bucketHandler;
+ MyBucketModifiedHandler _modifiedHandler;
+ std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB;
+ MySubDb _ready;
+ MySubDb _notReady;
+ BucketCreateNotifier _bucketCreateNotifier;
+ test::DiskMemUsageNotifier _diskMemUsageNotifier;
+ ThreadStackExecutor _singleExecutor;
+ ExecutorThreadService _master;
+ DummyBucketExecutor _bucketExecutor;
+ MyMoveHandler _moveHandler;
+ BucketMoveJobV2 _bmj;
+ MyCountJobRunner _runner;
+ ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts);
+ ~ControllerFixtureBase();
+ ControllerFixtureBase &addReady(const BucketId &bucket) {
+ _calc->addReady(bucket);
+ return *this;
+ }
+ ControllerFixtureBase &remReady(const BucketId &bucket) {
+ _calc->remReady(bucket);
+ return *this;
+ }
+ ControllerFixtureBase &changeCalc() {
+ _calc->resetAsked();
+ _moveHandler.reset();
+ _modifiedHandler.reset();
+ _clusterStateHandler.notifyClusterStateChanged(_calc);
+ return *this;
+ }
+ ControllerFixtureBase &activateBucket(const BucketId &bucket) {
+ _ready.setBucketState(bucket, true);
+ _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::ACTIVE);
+ return *this;
+ }
+ ControllerFixtureBase &deactivateBucket(const BucketId &bucket) {
+ _ready.setBucketState(bucket, false);
+ _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE);
+ return *this;
+ }
+ const MoveOperationVector &docsMoved() const {
+ return _moveHandler._moves;
+ }
+ const std::vector<BucketId> &bucketsModified() const {
+ return _modifiedHandler._modified;
+ }
+ const BucketId::List &calcAsked() const {
+ return _calc->asked();
+ }
+ void runLoop() {
+ while (!_bmj.isBlocked() && !_bmj.run()) {
+ }
+ }
+ void sync() {
+ _bucketExecutor.sync();
+ _master.sync();
+ _master.sync(); // Handle that master schedules onto master again
+ }
+};
+
+ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts)
+ : _builder(),
+ _calc(std::make_shared<test::BucketStateCalculator>()),
+ _bucketHandler(),
+ _modifiedHandler(),
+ _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()),
+ _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY),
+ _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY),
+ _bucketCreateNotifier(),
+ _diskMemUsageNotifier(),
+ _singleExecutor(1, 0x10000),
+ _master(_singleExecutor),
+ _bucketExecutor(4),
+ _moveHandler(*_bucketDB, storeMoveDoneContexts),
+ _bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
+ _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler,
+ _diskMemUsageNotifier, blockableConfig,
+ "test", makeBucketSpace()),
+ _runner(_bmj)
+{
+}
+
+ControllerFixtureBase::~ControllerFixtureBase() = default;
+constexpr double RESOURCE_LIMIT_FACTOR = 1.0;
+constexpr uint32_t MAX_OUTSTANDING_OPS = 10;
+const BlockableMaintenanceJobConfig BLOCKABLE_CONFIG(RESOURCE_LIMIT_FACTOR, MAX_OUTSTANDING_OPS);
+
+struct ControllerFixture : public ControllerFixtureBase
+{
+ ControllerFixture(const BlockableMaintenanceJobConfig &blockableConfig = BLOCKABLE_CONFIG)
+ : ControllerFixtureBase(blockableConfig, blockableConfig.getMaxOutstandingMoveOps() != MAX_OUTSTANDING_OPS)
+ {
+ _builder.createDocs(1, 1, 4); // 3 docs
+ _builder.createDocs(2, 4, 6); // 2 docs
+ _ready.insertDocs(_builder.getDocs());
+ _builder.clearDocs();
+ _builder.createDocs(3, 1, 3); // 2 docs
+ _builder.createDocs(4, 3, 6); // 3 docs
+ _notReady.insertDocs(_builder.getDocs());
+ }
+};
+
+struct OnlyReadyControllerFixture : public ControllerFixtureBase
+{
+ OnlyReadyControllerFixture() : ControllerFixtureBase(BLOCKABLE_CONFIG, false)
+ {
+ _builder.createDocs(1, 1, 2); // 1 docs
+ _builder.createDocs(2, 2, 4); // 2 docs
+ _builder.createDocs(3, 4, 7); // 3 docs
+ _builder.createDocs(4, 7, 11); // 4 docs
+ _ready.insertDocs(_builder.getDocs());
+ }
+};
+
+TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so)
+{
+ EXPECT_TRUE(_bmj.done());
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ EXPECT_TRUE(docsMoved().empty());
+ EXPECT_TRUE(bucketsModified().empty());
+}
+
+TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_bucket_state_says_so)
+{
+ // bucket 4 should be moved
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(4));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]);
+ assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]);
+ assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[2]);
+ ASSERT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_notReady.bucket(4), bucketsModified()[0]);
+}
+
+TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_bucket_state_says_so)
+{
+ // bucket 2 should be moved
+ addReady(_ready.bucket(1));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]);
+ assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]);
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+}
+
+
+TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps)
+{
+ // bucket 2, 3, and 4 should be moved
+ addReady(_ready.bucket(1));
+ addReady(_notReady.bucket(3));
+ addReady(_notReady.bucket(4));
+
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(4u, docsMoved().size());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 2));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(6u, docsMoved().size());
+
+ // move bucket 4, docs 3
+ EXPECT_TRUE(_bmj.scanAndMove(1,2));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(7u, docsMoved().size());
+ EXPECT_EQ(3u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]);
+ EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]);
+ EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]);
+}
+
+TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_done)
+{
+ // bucket 4 should be moved
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(4));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+
+ EXPECT_FALSE(_bmj.scanAndMove(1, 1));
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_TRUE(_bmj.scanAndMove(1, 2));
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+}
+
+
+TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_not_ready_until_being_not_active)
+{
+ // bucket 1 should be moved but is active
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ activateBucket(_ready.bucket(1));
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // scan all, delay active bucket 1
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+
+ deactivateBucket(_ready.bucket(1));
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // move delayed and de-activated bucket 1
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]);
+}
+
+TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_we_change_calculator)
+{
+ // bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ _bmj.scanAndMove(1, 1);
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ changeCalc(); // Not cancelled, bucket 1 still moving to notReady
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+ _calc->resetAsked();
+ _bmj.scanAndMove(1, 1);
+ EXPECT_FALSE(_bmj.done());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, calcAsked().size());
+ addReady(_ready.bucket(1));
+ changeCalc(); // cancelled, bucket 1 no longer moving to notReady
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+ _calc->resetAsked();
+ remReady(_ready.bucket(1));
+ _calc->resetAsked();
+ changeCalc(); // not cancelled. No active bucket move
+ EXPECT_EQ(4u, calcAsked().size());
+ _bmj.scanAndMove(1, 1);
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(2), calcAsked()[1]);
+ EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]);
+ _bmj.scanAndMove(2, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(3u, docsMoved().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]);
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+}
+
+TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_calculator_does_not_say_so)
+{
+ // bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ activateBucket(_ready.bucket(1));
+ _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+
+ deactivateBucket(_ready.bucket(1));
+ addReady(_ready.bucket(1));
+ changeCalc();
+ _bmj.scanAndMove(4, 3); // consider delayed bucket 3
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ EXPECT_EQ(4u, calcAsked().size());
+ EXPECT_EQ(_ready.bucket(1), calcAsked()[0]);
+}
+
+TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_as_retired)
+{
+ _calc->setNodeRetired(true);
+ // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired.
+ addReady(_ready.bucket(1));
+ _bmj.recompute();
+ _bmj.scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+}
+
+// Technically this should never happen since a retired node is never in the ideal state,
+// but test this case for the sake of completion.
+TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_is_marked_as_retired)
+{
+ _calc->setNodeRetired(true);
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(3));
+ _bmj.recompute();
+ _bmj.scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_EQ(0u, docsMoved().size());
+}
+
+TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_ready_even_if_node_is_marked_as_retired)
+{
+ _calc->setNodeRetired(true);
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ addReady(_notReady.bucket(3));
+ _bmj.recompute();
+ activateBucket(_notReady.bucket(3));
+ _bmj.scanAndMove(4, 3);
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ ASSERT_EQ(2u, docsMoved().size());
+ assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]);
+ assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]);
+ ASSERT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]);
+}
+
+
+TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job)
+{
+ EXPECT_TRUE(_bmj.done());
+ addReady(_ready.bucket(1));
+ addReady(_ready.bucket(2));
+ runLoop();
+ EXPECT_TRUE(_bmj.done());
+ sync();
+ EXPECT_TRUE(docsMoved().empty());
+ EXPECT_TRUE(bucketsModified().empty());
+ addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify
+ EXPECT_TRUE(_bmj.done()); // move job still believes work done
+ sync();
+ EXPECT_TRUE(bucketsModified().empty());
+ _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(bucketsModified().empty());
+ sync();
+ EXPECT_TRUE(bucketsModified().empty());
+ runLoop();
+ EXPECT_TRUE(_bmj.done());
+ sync();
+
+ EXPECT_EQ(1u, bucketsModified().size());
+ EXPECT_EQ(2u, docsMoved().size());
+}
+
+
+struct ResourceLimitControllerFixture : public ControllerFixture
+{
+ ResourceLimitControllerFixture(double resourceLimitFactor = RESOURCE_LIMIT_FACTOR) :
+ ControllerFixture(BlockableMaintenanceJobConfig(resourceLimitFactor, MAX_OUTSTANDING_OPS))
+ {}
+
+ void testJobStopping(DiskMemUsageState blockingUsageState) {
+ // Bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ // Note: This depends on _bmj.run() moving max 1 documents
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ // Notify that we've over limit
+ _diskMemUsageNotifier.notify(blockingUsageState);
+ EXPECT_TRUE(_bmj.run());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ // Notify that we've under limit
+ _diskMemUsageNotifier.notify(DiskMemUsageState());
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ }
+
+ void testJobNotStopping(DiskMemUsageState blockingUsageState) {
+ // Bucket 1 should be moved
+ addReady(_ready.bucket(2));
+ _bmj.recompute();
+ EXPECT_FALSE(_bmj.done());
+ // Note: This depends on _bmj.run() moving max 1 documents
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(1u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ // Notify that we've over limit, but not over adjusted limit
+ _diskMemUsageNotifier.notify(blockingUsageState);
+ EXPECT_FALSE(_bmj.run());
+ sync();
+ EXPECT_EQ(2u, docsMoved().size());
+ EXPECT_EQ(0u, bucketsModified().size());
+ }
+};
+
+struct ResourceLimitControllerFixture_1_2 : public ResourceLimitControllerFixture {
+ ResourceLimitControllerFixture_1_2() : ResourceLimitControllerFixture(1.2) {}
+};
+
+TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_disk_limit_is_reached)
+{
+ testJobStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState()));
+}
+
+TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_memory_limit_is_reached)
+{
+ testJobStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8)));
+}
+
+TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_disk_resource_limit)
+{
+ testJobNotStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState()));
+}
+
+TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_memory_resource_limit)
+{
+ testJobNotStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8)));
+}
+
+
+struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase
+{
+ MaxOutstandingMoveOpsFixture(uint32_t maxOutstandingOps)
+ : ControllerFixtureBase(BlockableMaintenanceJobConfig(RESOURCE_LIMIT_FACTOR, maxOutstandingOps), true)
+ {
+ _builder.createDocs(1, 1, 2);
+ _builder.createDocs(2, 2, 3);
+ _builder.createDocs(3, 3, 4);
+ _builder.createDocs(4, 4, 5);
+ _ready.insertDocs(_builder.getDocs());
+ _builder.clearDocs();
+ _builder.createDocs(11, 1, 2);
+ _builder.createDocs(12, 2, 3);
+ _builder.createDocs(13, 3, 4);
+ _builder.createDocs(14, 4, 5);
+ _notReady.insertDocs(_builder.getDocs());
+ addReady(_ready.bucket(3));
+ _bmj.recompute();
+ }
+
+ void assertRunToBlocked() {
+ EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_TRUE(_bmj.isBlocked());
+ EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS));
+ }
+ void assertRunToNotBlocked() {
+ EXPECT_FALSE(_bmj.run());
+ EXPECT_FALSE(_bmj.done());
+ EXPECT_FALSE(_bmj.isBlocked());
+ }
+ void assertRunToFinished() {
+ EXPECT_TRUE(_bmj.run());
+ EXPECT_TRUE(_bmj.done());
+ EXPECT_FALSE(_bmj.isBlocked());
+ }
+ void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) {
+ EXPECT_EQ(expDocsMovedCnt, docsMoved().size());
+ EXPECT_EQ(expMoveContextsCnt, _moveHandler._moveDoneContexts.size());
+ }
+ void unblockJob(uint32_t expRunnerCnt) {
+ _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner
+ EXPECT_EQ(expRunnerCnt, _runner.runCount);
+ EXPECT_FALSE(_bmj.isBlocked());
+ }
+};
+
+struct MaxOutstandingMoveOpsFixture_1 : public MaxOutstandingMoveOpsFixture {
+ MaxOutstandingMoveOpsFixture_1() : MaxOutstandingMoveOpsFixture(1) {}
+};
+
+struct MaxOutstandingMoveOpsFixture_2 : public MaxOutstandingMoveOpsFixture {
+ MaxOutstandingMoveOpsFixture_2() : MaxOutstandingMoveOpsFixture(2) {}
+};
+
+TEST_F(MaxOutstandingMoveOpsFixture_1, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_1)
+{
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(1, 1);
+ assertRunToBlocked();
+ assertDocsMoved(1, 1);
+
+ unblockJob(1);
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(2, 1);
+
+ unblockJob(2);
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(3, 1);
+
+ unblockJob(3);
+ assertRunToFinished();
+ sync();
+ assertDocsMoved(3, 0);
+}
+
+TEST_F(MaxOutstandingMoveOpsFixture_2, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_2)
+{
+ assertRunToNotBlocked();
+ sync();
+ assertDocsMoved(1, 1);
+
+ assertRunToBlocked();
+ sync();
+ assertDocsMoved(2, 2);
+
+ unblockJob(2);
+ assertRunToFinished();
+ sync();
+ assertDocsMoved(3, 1);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
index 420f45d99e8..ae504fef603 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp
@@ -43,7 +43,7 @@ struct DocumentMoverTest : ::testing::Test
: _builder(),
_bucketDB(std::make_shared<bucketdb::BucketDBOwner>()),
_limiter(),
- _mover(_limiter),
+ _mover(_limiter, _bucketDb),
_source(_builder, _bucketDB, 0u, SubDbType::READY),
_bucketDb(),
_handler(_bucketDb)
@@ -58,7 +58,7 @@ struct DocumentMoverTest : ::testing::Test
_source._subDb.retriever(),
_source._subDb.feed_view(),
&_pendingLidsForCommit);
- _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler, _bucketDb);
+ _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler);
}
bool moveDocuments(size_t maxDocsToMove) {
return _mover.moveDocuments(maxDocsToMove);
@@ -68,7 +68,7 @@ struct DocumentMoverTest : ::testing::Test
TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done)
{
MyMoveOperationLimiter limiter;
- DocumentBucketMover mover(limiter);
+ DocumentBucketMover mover(limiter, _bucketDb);
EXPECT_TRUE(mover.bucketDone());
mover.moveDocuments(2);
EXPECT_TRUE(mover.bucketDone());
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 26fba8a780f..86d238c7554 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -6,6 +6,7 @@ vespa_add_library(searchcore_server STATIC
bootstrapconfigmanager.cpp
buckethandler.cpp
bucketmovejob.cpp
+ bucketmovejobv2.cpp
clusterstatehandler.cpp
combiningfeedview.cpp
ddbstate.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index 8b9b3c539d6..30b1e8b623c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -64,8 +64,7 @@ BucketMoveJob::checkBucket(const BucketId &bucket,
const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id());
- mover.setupForBucket(bucket, &source, target.sub_db_id(),
- _moveHandler, _ready.meta_store()->getBucketDB());
+ mover.setupForBucket(bucket, &source, target.sub_db_id(), _moveHandler);
}
BucketMoveJob::ScanResult
@@ -151,7 +150,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
_modifiedHandler(modifiedHandler),
_ready(ready),
_notReady(notReady),
- _mover(getLimiter()),
+ _mover(getLimiter(), _ready.meta_store()->getBucketDB()),
_doneScan(false),
_scanPos(),
_scanPass(ScanPass::FIRST),
@@ -161,7 +160,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
_delayedBucketsFrozen(),
_frozenBuckets(frozenBuckets),
_bucketCreateNotifier(bucketCreateNotifier),
- _delayedMover(getLimiter()),
+ _delayedMover(getLimiter(), _ready.meta_store()->getBucketDB()),
_clusterStateChangedNotifier(clusterStateChangedNotifier),
_bucketStateChangedNotifier(bucketStateChangedNotifier),
_diskMemUsageNotifier(diskMemUsageNotifier)
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
new file mode 100644
index 00000000000..77942b72478
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -0,0 +1,368 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucketmovejobv2.h"
+#include "imaintenancejobrunner.h"
+#include "ibucketstatechangednotifier.h"
+#include "iclusterstatechangednotifier.h"
+#include "maintenancedocumentsubdb.h"
+#include "i_disk_mem_usage_notifier.h"
+#include "ibucketmodifiedhandler.h"
+#include "move_operation_limiter.h"
+#include "document_db_maintenance_config.h"
+#include <vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h>
+#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
+#include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h>
+#include <vespa/searchcorespi/index/i_thread_service.h>
+#include <vespa/persistence/spi/bucket_tasks.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/lambdatask.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".proton.server.bucketmovejob");
+
+using document::BucketId;
+using storage::spi::BucketInfo;
+using storage::spi::Bucket;
+using storage::spi::makeBucketTask;
+using proton::bucketdb::BucketMover;
+using vespalib::makeLambdaTask;
+
+namespace proton {
+
+namespace {
+
+const char * bool2str(bool v) { return (v ? "T" : "F"); }
+
+bool
+blockedDueToClusterState(const IBucketStateCalculator::SP &calc)
+{
+ bool clusterUp = calc && calc->clusterUp();
+ bool nodeUp = calc && calc->nodeUp();
+ bool nodeInitializing = calc && calc->nodeInitializing();
+ return !(clusterUp && nodeUp && !nodeInitializing);
+}
+
+}
+
+BucketMoveJobV2::BucketMoveJobV2(const IBucketStateCalculator::SP &calc,
+ IDocumentMoveHandler &moveHandler,
+ IBucketModifiedHandler &modifiedHandler,
+ IThreadService & master,
+ BucketExecutor & bucketExecutor,
+ const MaintenanceDocumentSubDB &ready,
+ const MaintenanceDocumentSubDB &notReady,
+ bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ const vespalib::string &docTypeName,
+ document::BucketSpace bucketSpace)
+ : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig),
+ IClusterStateChangedHandler(),
+ bucketdb::IBucketCreateListener(),
+ IBucketStateChangedHandler(),
+ IDiskMemUsageListener(),
+ _calc(calc),
+ _moveHandler(moveHandler),
+ _modifiedHandler(modifiedHandler),
+ _master(master),
+ _bucketExecutor(bucketExecutor),
+ _ready(ready),
+ _notReady(notReady),
+ _bucketSpace(bucketSpace),
+ _iterateCount(0),
+ _movers(),
+ _buckets2Move(),
+ _stopped(false),
+ _startedCount(0),
+ _executedCount(0),
+ _bucketCreateNotifier(bucketCreateNotifier),
+ _clusterStateChangedNotifier(clusterStateChangedNotifier),
+ _bucketStateChangedNotifier(bucketStateChangedNotifier),
+ _diskMemUsageNotifier(diskMemUsageNotifier)
+{
+ _movers.reserve(std::min(100u, blockableConfig.getMaxOutstandingMoveOps()));
+ if (blockedDueToClusterState(_calc)) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ }
+
+ _bucketCreateNotifier.addListener(this);
+ _clusterStateChangedNotifier.addClusterStateChangedHandler(this);
+ _bucketStateChangedNotifier.addBucketStateChangedHandler(this);
+ _diskMemUsageNotifier.addDiskMemUsageListener(this);
+ recompute();
+}
+
+BucketMoveJobV2::~BucketMoveJobV2()
+{
+ _bucketCreateNotifier.removeListener(this);
+ _clusterStateChangedNotifier.removeClusterStateChangedHandler(this);
+ _bucketStateChangedNotifier.removeBucketStateChangedHandler(this);
+ _diskMemUsageNotifier.removeDiskMemUsageListener(this);
+}
+
+BucketMoveJobV2::NeedResult
+BucketMoveJobV2::needMove(const ScanIterator &itr) const {
+ NeedResult noMove(false, false);
+ const bool hasReadyDocs = itr.hasReadyBucketDocs();
+ const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs();
+ if (!hasReadyDocs && !hasNotReadyDocs) {
+ return noMove; // No documents for bucket in ready or notready subdbs
+ }
+ const bool isActive = itr.isActive();
+ // No point in moving buckets when node is retired and everything will be deleted soon.
+ // However, allow moving of explicitly activated buckets, as this implies a lack of other good replicas.
+ if (!_calc || (_calc->nodeRetired() && !isActive)) {
+ return noMove;
+ }
+ const bool shouldBeReady = _calc->shouldBeReady(document::Bucket(_bucketSpace, itr.getBucket()));
+ const bool wantReady = shouldBeReady || isActive;
+ LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)",
+ itr.getBucket().toString().c_str(), bool2str(shouldBeReady), bool2str(isActive));
+ if (wantReady) {
+ if (!hasNotReadyDocs)
+ return noMove; // No notready bucket to make ready
+ } else {
+ if (isActive)
+ return noMove; // Do not move rom ready to not ready when active
+ if (!hasReadyDocs)
+ return noMove; // No ready bucket to make notready
+ }
+ return {true, wantReady};
+}
+
+void
+BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
+ auto [keys, done] = mover->getKeysToMove(maxDocsToMove);
+ if (done) {
+ mover->setBucketDone();
+ }
+ if (keys.empty()) return;
+ if (_stopped.load(std::memory_order_relaxed)) return;
+ mover->updateLastValidGid(keys.back()._gid);
+ auto context = getLimiter().beginOperation();
+ Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket()));
+ auto bucketTask = makeBucketTask(
+ [this, mover=std::move(mover), keys=std::move(keys),opsTracker=getLimiter().beginOperation()]
+ (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) mutable
+ {
+ assert(mover->getBucket() == bucket.getBucketId());
+ using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>;
+ prepareMove(std::move(mover), std::move(keys),
+ std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
+ });
+ auto failed = _bucketExecutor.execute(spiBucket, std::move(bucketTask));
+ if (!failed) {
+ _startedCount.fetch_add(1, std::memory_order_relaxed);
+ }
+}
+
+namespace {
+
+class IncOnDestruct {
+public:
+ IncOnDestruct(std::atomic<size_t> & count) : _count(count) {}
+ ~IncOnDestruct() {
+ _count.fetch_add(1, std::memory_order_relaxed);
+ }
+private:
+ std::atomic<size_t> & _count;
+};
+
+}
+
+void
+BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone)
+{
+ IncOnDestruct countGuard(_executedCount);
+ if (_stopped.load(std::memory_order_relaxed)) return;
+ auto moveOps = mover->createMoveOperations(keys);
+ _master.execute(makeLambdaTask([this, mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
+ completeMove(std::move(mover), std::move(moveOps), std::move(onDone));
+ }));
+}
+
+void
+BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> ops, IDestructorCallbackSP onDone) {
+ mover->moveDocuments(std::move(ops), std::move(onDone));
+ if (mover->bucketDone() && mover->inSync()) {
+ _modifiedHandler.notifyBucketModified(mover->getBucket());
+ }
+}
+
+void
+BucketMoveJobV2::cancelMovesForBucket(BucketId bucket) {
+ for (auto itr = _movers.begin(); itr != _movers.end(); itr++) {
+ if (bucket == (*itr)->getBucket()) {
+ _movers.erase(itr);
+ backFillMovers();
+ return;
+ }
+ }
+}
+
+void
+BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket)
+{
+ ScanIterator itr(guard, bucket);
+ auto [mustMove, wantReady] = needMove(itr);
+ if (mustMove) {
+ _buckets2Move[bucket] = wantReady;
+ } else {
+ _buckets2Move.erase(bucket);
+ cancelMovesForBucket(bucket);
+ }
+ backFillMovers();
+ considerRun();
+}
+
+void
+BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket)
+{
+ considerBucket(guard, bucket);
+}
+
+BucketMoveJobV2::BucketSet
+BucketMoveJobV2::computeBuckets2Move()
+{
+ BucketMoveJobV2::BucketSet toMove;
+ for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) {
+ auto [mustMove, wantReady] = needMove(itr);
+ if (mustMove) {
+ toMove[itr.getBucket()] = wantReady;
+ }
+ }
+ return toMove;
+}
+
+std::shared_ptr<BucketMover>
+BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) {
+ const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready);
+ const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
+ LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
+ bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id());
+ return std::make_shared<BucketMover>(bucket, &source, target.sub_db_id(), _moveHandler);
+}
+
+std::shared_ptr<BucketMover>
+BucketMoveJobV2::greedyCreateMover() {
+ if ( ! _buckets2Move.empty()) {
+ auto next = _buckets2Move.begin();
+ auto mover = createMover(next->first, next->second);
+ _buckets2Move.erase(next->first);
+ return mover;
+ }
+ return {};
+}
+
+bool
+BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
+ if (done()) return true;
+
+ // Select mover
+ size_t index = _iterateCount++ % _movers.size();
+ const auto & mover = _movers[index];
+
+ //Move, or reduce movers as we are tailing off
+ if (!mover->bucketDone()) {
+ startMove(mover, maxDocsToMove);
+ if (mover->bucketDone()) {
+ auto next = greedyCreateMover();
+ if (next) {
+ _movers[index] = next;
+ } else {
+ _movers.erase(_movers.begin() + index);
+ }
+ }
+ }
+ return done();
+}
+
+bool
+BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) {
+ for (size_t i(0); i < maxBuckets2Move; i++) {
+ moveDocs(maxDocsToMovePerBucket);
+ }
+ return isBlocked() || done();
+}
+
+bool
+BucketMoveJobV2::done() const {
+ return _buckets2Move.empty() && _movers.empty() && !isBlocked();
+}
+
+bool
+BucketMoveJobV2::run()
+{
+ if (isBlocked()) {
+ return true; // indicate work is done, since node state is bad
+ }
+ /// Returning false here will immediately post the job back on the executor. This will give a busy loop,
+ /// but this is considered fine as it is very rare and it will be intermingled with multiple feed operations.
+ if ( ! scanAndMove(1, 1) ) {
+ return false;
+ }
+
+ if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
+ return true;
+ }
+ return done();
+}
+
+void
+BucketMoveJobV2::recompute() {
+ _movers.clear();
+ _buckets2Move = computeBuckets2Move();
+ backFillMovers();
+}
+
+void
+BucketMoveJobV2::backFillMovers() {
+ // Ensure we have enough movers.
+ while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) {
+ _movers.push_back(greedyCreateMover());
+ }
+}
+void
+BucketMoveJobV2::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc)
+{
+ // Called by master write thread
+ _calc = newCalc;
+ recompute();
+ if (blockedDueToClusterState(_calc)) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ } else {
+ unBlock(BlockedReason::CLUSTER_STATE);
+ }
+}
+
+void
+BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState)
+{
+ // Called by master write thread
+ considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
+}
+
+void
+BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state)
+{
+ // Called by master write thread
+ internalNotifyDiskMemUsage(state);
+}
+
+bool
+BucketMoveJobV2::inSync() const {
+ return _executedCount == _startedCount;
+}
+
+void
+BucketMoveJobV2::onStop() {
+ // Called by master write thread
+ _stopped = true;
+ while ( ! inSync() ) {
+ std::this_thread::sleep_for(1ms);
+ }
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
new file mode 100644
index 00000000000..714f24b6ff6
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -0,0 +1,119 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "blockable_maintenance_job.h"
+#include "documentbucketmover.h"
+#include "i_disk_mem_usage_listener.h"
+#include "ibucketstatechangedhandler.h"
+#include "iclusterstatechangedhandler.h"
+#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h>
+#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
+
+namespace storage::spi { struct BucketExecutor; }
+namespace searchcorespi::index { struct IThreadService; }
+
+namespace proton {
+
+class BlockableMaintenanceJobConfig;
+class IBucketStateChangedNotifier;
+class IClusterStateChangedNotifier;
+class IDiskMemUsageNotifier;
+class IBucketModifiedHandler;
+
+namespace bucketdb { class IBucketCreateNotifier; }
+
+/**
+ * Class used to control the moving of buckets between the ready and
+ * not ready sub databases based on the readiness of buckets according to the cluster state.
+ * It will first compute the set of buckets to be moved. Then N of these buckets will be iterated in parallel and
+ * the documents scheduled for move. The movment will happen in 3 phases.
+ * 1 - Collect meta info for documents. Must happend in master thread
+ * 2 - Acquire bucket lock and fetch documents and very against meta data. This is done in BucketExecutor threads.
+ * 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved
+ * bucket modified notification is sent.
+ */
+class BucketMoveJobV2 : public BlockableMaintenanceJob,
+ public IClusterStateChangedHandler,
+ public bucketdb::IBucketCreateListener,
+ public IBucketStateChangedHandler,
+ public IDiskMemUsageListener
+{
+private:
+ using BucketExecutor = storage::spi::BucketExecutor;
+ using IDestructorCallback = vespalib::IDestructorCallback;
+ using IDestructorCallbackSP = std::shared_ptr<IDestructorCallback>;
+ using IThreadService = searchcorespi::index::IThreadService;
+ using BucketId = document::BucketId;
+ using ScanIterator = bucketdb::ScanIterator;
+ using BucketSet = std::map<BucketId, bool>;
+ using NeedResult = std::pair<bool, bool>;
+ using ActiveState = storage::spi::BucketInfo::ActiveState;
+ using BucketMover = bucketdb::BucketMover;
+ using BucketMoverSP = std::shared_ptr<BucketMover>;
+ using Movers = std::vector<std::shared_ptr<BucketMover>>;
+ using MoveKey = BucketMover::MoveKey;
+ using GuardedMoveOp = BucketMover::GuardedMoveOp;
+ std::shared_ptr<IBucketStateCalculator> _calc;
+ IDocumentMoveHandler &_moveHandler;
+ IBucketModifiedHandler &_modifiedHandler;
+ IThreadService &_master;
+ BucketExecutor &_bucketExecutor;
+ const MaintenanceDocumentSubDB &_ready;
+ const MaintenanceDocumentSubDB &_notReady;
+ const document::BucketSpace _bucketSpace;
+ size_t _iterateCount;
+ Movers _movers;
+ BucketSet _buckets2Move;
+ std::atomic<bool> _stopped;
+ std::atomic<size_t> _startedCount;
+ std::atomic<size_t> _executedCount;
+
+ bucketdb::IBucketCreateNotifier &_bucketCreateNotifier;
+ IClusterStateChangedNotifier &_clusterStateChangedNotifier;
+ IBucketStateChangedNotifier &_bucketStateChangedNotifier;
+ IDiskMemUsageNotifier &_diskMemUsageNotifier;
+
+ void startMove(BucketMoverSP mover, size_t maxDocsToMove);
+ void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context);
+ void completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> keys, IDestructorCallbackSP context);
+ void considerBucket(const bucketdb::Guard & guard, BucketId bucket);
+ NeedResult needMove(const ScanIterator &itr) const;
+ BucketSet computeBuckets2Move();
+ BucketMoverSP createMover(BucketId bucket, bool wantReady);
+ BucketMoverSP greedyCreateMover();
+ void backFillMovers();
+ void cancelMovesForBucket(BucketId bucket);
+ bool moveDocs(size_t maxDocsToMove);
+public:
+ BucketMoveJobV2(const IBucketStateCalculator::SP &calc,
+ IDocumentMoveHandler &moveHandler,
+ IBucketModifiedHandler &modifiedHandler,
+ IThreadService & master,
+ BucketExecutor & bucketExecutor,
+ const MaintenanceDocumentSubDB &ready,
+ const MaintenanceDocumentSubDB &notReady,
+ bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ const vespalib::string &docTypeName,
+ document::BucketSpace bucketSpace);
+
+ ~BucketMoveJobV2() override;
+
+ bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket);
+ bool done() const;
+ void recompute();
+ bool inSync() const;
+
+ bool run() override;
+ void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
+ void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override;
+ void notifyDiskMemUsage(DiskMemUsageState state) override;
+ void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override;
+ void onStop() override;
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index 76a65df4d1a..6defd3e7037 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -20,8 +20,8 @@ namespace proton::bucketdb {
typedef IDocumentMetaStore::Iterator Iterator;
-MoveOperation::UP
-BucketMover::createMoveOperation(const MoveKey &key) {
+BucketMover::GuardedMoveOp
+BucketMover::createMoveOperation(MoveKey &key) {
if (_source->lidNeedsCommit(key._lid)) {
return {};
}
@@ -29,9 +29,10 @@ BucketMover::createMoveOperation(const MoveKey &key) {
if (!doc || doc->getId().getGlobalId() != key._gid)
return {}; // Failed to retrieve document, removed or changed identity
BucketId bucketId = _bucket.stripUnused();
- return std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc),
- DbDocumentId(_source->sub_db_id(), key._lid),
- _targetSubDbId);
+ return BucketMover::GuardedMoveOp(std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc),
+ DbDocumentId(_source->sub_db_id(), key._lid),
+ _targetSubDbId),
+ std::move(key._guard));
}
void
@@ -39,21 +40,34 @@ BucketMover::moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone)
_handler->handleMove(*moveOp, std::move(onDone));
}
+BucketMover::MoveKey::MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept
+ : _lid(lid),
+ _gid(gid),
+ _timestamp(timestamp),
+ _guard(std::move(guard))
+{ }
+
+BucketMover::MoveKey::~MoveKey() = default;
-BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId,
- IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept
+BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source,
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept
: _source(source),
_handler(&handler),
- _bucketDb(&bucketDb),
_bucket(bucket),
_targetSubDbId(targetSubDbId),
+ _started(0),
+ _completed(0),
_bucketDone(false),
- _lastGid(),
- _lastGidValid(false)
+ _lastGidValid(false),
+ _lastGid()
{ }
+BucketMover::~BucketMover() {
+ assert(inSync());
+}
+
std::pair<std::vector<BucketMover::MoveKey>, bool>
-BucketMover::getKeysToMove(size_t maxDocsToMove) const {
+BucketMover::getKeysToMove(size_t maxDocsToMove) {
std::pair<std::vector<BucketMover::MoveKey>, bool> result;
Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid)
: _source->meta_store()->lowerBound(_bucket));
@@ -63,7 +77,7 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const {
uint32_t lid = itr.getKey().get_lid();
const RawDocumentMetaData &metaData = _source->meta_store()->getRawMetaData(lid);
if (metaData.getBucketUsedBits() == _bucket.getUsedBits()) {
- result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp());
+ result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp(), MoveGuard(*this));
++docsMoved;
}
}
@@ -71,13 +85,13 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const {
return result;
}
-std::vector<MoveOperation::UP>
-BucketMover::createMoveOperations(const std::vector<MoveKey> &toMove) {
- std::vector<MoveOperation::UP> successfulReads;
+std::vector<BucketMover::GuardedMoveOp>
+BucketMover::createMoveOperations(std::vector<MoveKey> &toMove) {
+ std::vector<GuardedMoveOp> successfulReads;
successfulReads.reserve(toMove.size());
- for (const MoveKey &key : toMove) {
+ for (MoveKey &key : toMove) {
auto moveOp = createMoveOperation(key);
- if (!moveOp) {
+ if (!moveOp.first) {
break;
}
successfulReads.push_back(std::move(moveOp));
@@ -86,37 +100,10 @@ BucketMover::createMoveOperations(const std::vector<MoveKey> &toMove) {
}
void
-BucketMover::moveDocuments(std::vector<MoveOperation::UP> moveOps, IDestructorCallbackSP onDone) {
- for (auto & moveOp : moveOps) {
- moveDocument(std::move(moveOp), onDone);
- }
-}
-
-bool
-BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
-{
- if (_bucketDone) {
- return true;
- }
- auto [keys, done] = getKeysToMove(maxDocsToMove);
- auto moveOps = createMoveOperations(keys);
- bool allOk = keys.size() == moveOps.size();
- if (done && allOk) {
- setBucketDone();
- }
- if (moveOps.empty()) return allOk;
-
- updateLastValidGid(moveOps.back()->getDocument()->getId().getGlobalId());
-
+BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone) {
for (auto & moveOp : moveOps) {
- // We cache the bucket for the document we are going to move to avoid getting
- // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready
- // sub dbs as the bucket info is not updated atomically in this case.
- _bucketDb->takeGuard()->cacheBucket(moveOp->getBucketId());
- _handler->handleMove(*moveOp, limiter.beginOperation());
- _bucketDb->takeGuard()->uncacheBucket();
+ moveDocument(std::move(moveOp.first), std::move(onDone));
}
- return allOk;
}
}
@@ -124,22 +111,51 @@ BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
namespace proton {
using bucketdb::BucketMover;
+using bucketdb::BucketDBOwner;
-DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept
+DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter, BucketDBOwner &bucketDb) noexcept
: _limiter(limiter),
+ _bucketDb(&bucketDb),
_impl()
{}
void
DocumentBucketMover::setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
- uint32_t targetSubDbId, IDocumentMoveHandler &handler, bucketdb::BucketDBOwner &bucketDb)
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler)
{
- _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler, bucketDb);
+ _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler);
}
bool
DocumentBucketMover::moveDocuments(size_t maxDocsToMove) {
- return !_impl || _impl->moveDocuments(maxDocsToMove, _limiter);
+ return !_impl || moveDocuments(maxDocsToMove, _limiter);
+}
+
+bool
+DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter)
+{
+ if (_impl->bucketDone()) {
+ return true;
+ }
+ auto [keys, done] = _impl->getKeysToMove(maxDocsToMove);
+ auto moveOps = _impl->createMoveOperations(keys);
+ bool allOk = keys.size() == moveOps.size();
+ if (done && allOk) {
+ _impl->setBucketDone();
+ }
+ if (moveOps.empty()) return allOk;
+
+ _impl->updateLastValidGid(moveOps.back().first->getDocument()->getId().getGlobalId());
+
+ for (auto & moveOp : moveOps) {
+ // We cache the bucket for the document we are going to move to avoid getting
+ // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready
+ // sub dbs as the bucket info is not updated atomically in this case.
+ _bucketDb->takeGuard()->cacheBucket(moveOp.first->getBucketId());
+ _impl->moveDocument(std::move(moveOp.first), limiter.beginOperation());
+ _bucketDb->takeGuard()->uncacheBucket();
+ }
+ return allOk;
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index dc91d40ce4e..c4f94a88cfa 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -5,6 +5,7 @@
#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/base/globalid.h>
#include <persistence/spi/types.h>
+#include <atomic>
namespace vespalib { class IDestructorCallback; }
@@ -30,40 +31,60 @@ public:
using MoveOperationUP = std::unique_ptr<MoveOperation>;
using IDestructorCallback = vespalib::IDestructorCallback;
using IDestructorCallbackSP = std::shared_ptr<IDestructorCallback>;
+ class MoveGuard {
+ public:
+ MoveGuard() noexcept : _mover(nullptr) {}
+ MoveGuard(BucketMover & mover) noexcept
+ : _mover(&mover)
+ {
+ _mover->_started.fetch_add(1, std::memory_order_relaxed);
+ }
+ MoveGuard(MoveGuard && rhs) noexcept : _mover(rhs._mover) { rhs._mover = nullptr; }
+ MoveGuard & operator = (MoveGuard && mover) = delete;
+ MoveGuard(const MoveGuard & rhs) = delete;
+ MoveGuard & operator = (const MoveGuard & mover) = delete;
+ ~MoveGuard() {
+ if (_mover) {
+ _mover->_completed.fetch_add(1, std::memory_order_relaxed);
+ }
+ }
+ private:
+ BucketMover *_mover;
+ };
struct MoveKey
{
using Timestamp = storage::spi::Timestamp;
- MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp) noexcept
- : _lid(lid),
- _gid(gid),
- _timestamp(timestamp)
- { }
+ MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept;
+ MoveKey(MoveKey &&) noexcept = default;
+ ~MoveKey();
uint32_t _lid;
document::GlobalId _gid;
Timestamp _timestamp;
+ MoveGuard _guard;
};
- BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId,
- IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept;
+ BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept;
BucketMover(BucketMover &&) noexcept = default;
BucketMover & operator=(BucketMover &&) noexcept = delete;
BucketMover(const BucketMover &) = delete;
BucketMover & operator=(const BucketMover &) = delete;
+ ~BucketMover();
- // TODO remove once we have switched bucket move job
- bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter);
-
+ using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>;
/// Must be called in master thread
- std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove) const;
+ std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove);
/// Call from any thread
- std::vector<MoveOperationUP> createMoveOperations(const std::vector<MoveKey> & toMove);
+ std::vector<GuardedMoveOp> createMoveOperations(std::vector<MoveKey> & toMove);
/// Must be called in master thread
- void moveDocuments(std::vector<MoveOperationUP> moveOps, IDestructorCallbackSP onDone);
+ void moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone);
+ void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
const document::BucketId &getBucket() const { return _bucket; }
void cancel() { setBucketDone(); }
void setBucketDone() { _bucketDone = true; }
+ /// Signals all documents have been scheduled for move
bool bucketDone() const { return _bucketDone; }
const MaintenanceDocumentSubDB * getSource() const { return _source; }
/// Must be called in master thread
@@ -71,19 +92,24 @@ public:
_lastGid = gid;
_lastGidValid = true;
}
+ bool inSync() const {
+ return pending() == 0;
+ }
private:
const MaintenanceDocumentSubDB *_source;
IDocumentMoveHandler *_handler;
- BucketDBOwner *_bucketDb;
const document::BucketId _bucket;
const uint32_t _targetSubDbId;
- bool _bucketDone;
- document::GlobalId _lastGid;
+ std::atomic<uint32_t> _started;
+ std::atomic<uint32_t> _completed;
+ bool _bucketDone; // All moves started, or operation has been cancelled
bool _lastGidValid;
-
- void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
- MoveOperationUP createMoveOperation(const MoveKey & key);
+ document::GlobalId _lastGid;
+ GuardedMoveOp createMoveOperation(MoveKey & key);
+ size_t pending() const {
+ return _started.load(std::memory_order_relaxed) - _completed.load(std::memory_order_relaxed);
+ }
};
}
@@ -95,10 +121,13 @@ private:
class DocumentBucketMover
{
private:
- IMoveOperationLimiter &_limiter;
- std::unique_ptr<bucketdb::BucketMover> _impl;
+ IMoveOperationLimiter &_limiter;
+ bucketdb::BucketDBOwner *_bucketDb;
+ std::unique_ptr<bucketdb::BucketMover> _impl;
+
+ bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter);
public:
- DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept;
+ DocumentBucketMover(IMoveOperationLimiter &limiter, bucketdb::BucketDBOwner &bucketDb) noexcept;
DocumentBucketMover(DocumentBucketMover &&) noexcept = default;
DocumentBucketMover & operator=(DocumentBucketMover &&) noexcept = delete;
DocumentBucketMover(const DocumentBucketMover &) = delete;
@@ -106,8 +135,7 @@ public:
void setupForBucket(const document::BucketId &bucket,
const MaintenanceDocumentSubDB *source,
uint32_t targetSubDbId,
- IDocumentMoveHandler &handler,
- bucketdb::BucketDBOwner &bucketDb);
+ IDocumentMoveHandler &handler);
const document::BucketId &getBucket() const { return _impl->getBucket(); }
bool moveDocuments(size_t maxDocsToMove);
void cancel() { _impl->cancel(); }
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index c8429984b1f..ce6acb4795e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketmovejob.h"
+#include "bucketmovejobv2.h"
#include "heart_beat_job.h"
#include "job_tracked_maintenance_job.h"
#include "lid_space_compaction_job.h"
@@ -37,8 +38,7 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller,
document::BucketSpace bucketSpace)
{
for (auto &lidHandler : lscHandlers) {
-
- IMaintenanceJob::UP job;
+ std::unique_ptr<IMaintenanceJob> job;
if (config.getLidSpaceCompactionConfig().useBucketExecutor()) {
job = std::make_unique<lidspace::CompactionJob>(
config.getLidSpaceCompactionConfig(),
@@ -65,6 +65,7 @@ void
injectBucketMoveJob(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
IFrozenBucketHandler &fbHandler,
+ storage::spi::BucketExecutor & bucketExecutor,
bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
const vespalib::string &docTypeName,
document::BucketSpace bucketSpace,
@@ -76,18 +77,35 @@ injectBucketMoveJob(MaintenanceController &controller,
DocumentDBJobTrackers &jobTrackers,
IDiskMemUsageNotifier &diskMemUsageNotifier)
{
- auto bmj = std::make_unique<BucketMoveJob>(calc,
- moveHandler,
- bucketModifiedHandler,
- controller.getReadySubDB(),
- controller.getNotReadySubDB(),
- fbHandler,
- bucketCreateNotifier,
- clusterStateChangedNotifier,
- bucketStateChangedNotifier,
- diskMemUsageNotifier,
- config.getBlockableJobConfig(),
- docTypeName, bucketSpace);
+ std::unique_ptr<IMaintenanceJob> bmj;
+ if (config.getBucketMoveConfig().useBucketExecutor()) {
+ bmj = std::make_unique<BucketMoveJobV2>(calc,
+ moveHandler,
+ bucketModifiedHandler,
+ controller.masterThread(),
+ bucketExecutor,
+ controller.getReadySubDB(),
+ controller.getNotReadySubDB(),
+ bucketCreateNotifier,
+ clusterStateChangedNotifier,
+ bucketStateChangedNotifier,
+ diskMemUsageNotifier,
+ config.getBlockableJobConfig(),
+ docTypeName, bucketSpace);
+ } else {
+ bmj = std::make_unique<BucketMoveJob>(calc,
+ moveHandler,
+ bucketModifiedHandler,
+ controller.getReadySubDB(),
+ controller.getNotReadySubDB(),
+ fbHandler,
+ bucketCreateNotifier,
+ clusterStateChangedNotifier,
+ bucketStateChangedNotifier,
+ diskMemUsageNotifier,
+ config.getBlockableJobConfig(),
+ docTypeName, bucketSpace);
+ }
controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj)));
}
@@ -133,9 +151,9 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
clusterStateChangedNotifier, calc, bucketSpace);
}
- injectBucketMoveJob(controller, config, fbHandler, bucketCreateNotifier, docTypeName, bucketSpace, moveHandler,
- bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, calc,
- jobTrackers, diskMemUsageNotifier);
+ injectBucketMoveJob(controller, config, fbHandler, bucketExecutor, bucketCreateNotifier, docTypeName, bucketSpace,
+ moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier,
+ calc, jobTrackers, diskMemUsageNotifier);
controller.registerJobInMasterThread(
std::make_unique<SampleAttributeUsageJob>(readyAttributeManager, notReadyAttributeManager,