From aa583fb4e59d126715b80d3ae5bd7cb483bfe623 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 3 Feb 2021 09:24:58 +0000 Subject: Add alternative bucket move job without frozen concept. --- .../documentdb/documentbucketmover/CMakeLists.txt | 12 + .../documentbucketmover/bucketmover_common.h | 2 +- .../documentbucketmover_test.cpp | 2 +- .../documentbucketmover_v2_test.cpp | 578 +++++++++++++++++++++ .../documentbucketmover/documentmover_test.cpp | 6 +- .../vespa/searchcore/proton/server/CMakeLists.txt | 1 + .../searchcore/proton/server/bucketmovejob.cpp | 7 +- .../searchcore/proton/server/bucketmovejobv2.cpp | 368 +++++++++++++ .../searchcore/proton/server/bucketmovejobv2.h | 119 +++++ .../proton/server/documentbucketmover.cpp | 116 +++-- .../searchcore/proton/server/documentbucketmover.h | 76 ++- .../proton/server/maintenance_jobs_injector.cpp | 52 +- 12 files changed, 1239 insertions(+), 100 deletions(-) create mode 100644 searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h (limited to 'searchcore') diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt index 1aa0b1c585d..d11da09b737 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt @@ -17,6 +17,18 @@ vespa_add_executable(searchcore_documentbucketmover_test_app TEST ) vespa_add_test(NAME searchcore_documentbucketmover_test_app COMMAND searchcore_documentbucketmover_test_app) +vespa_add_executable(searchcore_documentbucketmover_v2_test_app TEST + SOURCES + documentbucketmover_v2_test.cpp + DEPENDS + searchcore_bucketmover_test + searchcore_test + searchcore_server + searchcore_feedoperation + GTest::GTest +) +vespa_add_test(NAME searchcore_documentbucketmover_v2_test_app COMMAND searchcore_documentbucketmover_v2_test_app) + vespa_add_executable(searchcore_scaniterator_test_app TEST SOURCES scaniterator_test.cpp diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h index 65e206d7327..e2cd5e268d7 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h @@ -82,7 +82,7 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest { struct MyBucketModifiedHandler : public IBucketModifiedHandler { using BucketId = document::BucketId; - BucketId::List _modified; + std::vector _modified; void notifyBucketModified(const BucketId &bucket) override; diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 502639d1dca..5556ed0d475 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -112,7 +112,7 @@ struct ControllerFixtureBase : public ::testing::Test const MoveOperationVector &docsMoved() const { return _moveHandler._moves; } - const BucketId::List &bucketsModified() const { + const std::vector &bucketsModified() const { return _modifiedHandler._modified; } const BucketId::List &calcAsked() const { diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp new file mode 100644 index 00000000000..e7dc9b9e873 --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp @@ -0,0 +1,578 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketmover_common.h" +#include +#include +#include + +#include +LOG_SETUP("document_bucket_mover_test"); + +using namespace proton; +using namespace proton::move::test; +using document::BucketId; +using document::test::makeBucketSpace; +using proton::bucketdb::BucketCreateNotifier; +using storage::spi::BucketInfo; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; +using MoveOperationVector = std::vector; +using storage::spi::dummy::DummyBucketExecutor; +using vespalib::ThreadStackExecutor; + +struct ControllerFixtureBase : public ::testing::Test +{ + test::UserDocumentsBuilder _builder; + test::BucketStateCalculator::SP _calc; + test::ClusterStateHandler _clusterStateHandler; + test::BucketHandler _bucketHandler; + MyBucketModifiedHandler _modifiedHandler; + std::shared_ptr _bucketDB; + MySubDb _ready; + MySubDb _notReady; + BucketCreateNotifier _bucketCreateNotifier; + test::DiskMemUsageNotifier _diskMemUsageNotifier; + ThreadStackExecutor _singleExecutor; + ExecutorThreadService _master; + DummyBucketExecutor _bucketExecutor; + MyMoveHandler _moveHandler; + BucketMoveJobV2 _bmj; + MyCountJobRunner _runner; + ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); + ~ControllerFixtureBase(); + ControllerFixtureBase &addReady(const BucketId &bucket) { + _calc->addReady(bucket); + return *this; + } + ControllerFixtureBase &remReady(const BucketId &bucket) { + _calc->remReady(bucket); + return *this; + } + ControllerFixtureBase &changeCalc() { + _calc->resetAsked(); + _moveHandler.reset(); + _modifiedHandler.reset(); + _clusterStateHandler.notifyClusterStateChanged(_calc); + return *this; + } + ControllerFixtureBase &activateBucket(const BucketId &bucket) { + _ready.setBucketState(bucket, true); + _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::ACTIVE); + return *this; + } + ControllerFixtureBase &deactivateBucket(const BucketId &bucket) { + _ready.setBucketState(bucket, false); + _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE); + return *this; + } + const MoveOperationVector &docsMoved() const { + return _moveHandler._moves; + } + const std::vector &bucketsModified() const { + return _modifiedHandler._modified; + } + const BucketId::List &calcAsked() const { + return _calc->asked(); + } + void runLoop() { + while (!_bmj.isBlocked() && !_bmj.run()) { + } + } + void sync() { + _bucketExecutor.sync(); + _master.sync(); + _master.sync(); // Handle that master schedules onto master again + } +}; + +ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts) + : _builder(), + _calc(std::make_shared()), + _bucketHandler(), + _modifiedHandler(), + _bucketDB(std::make_shared()), + _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY), + _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY), + _bucketCreateNotifier(), + _diskMemUsageNotifier(), + _singleExecutor(1, 0x10000), + _master(_singleExecutor), + _bucketExecutor(4), + _moveHandler(*_bucketDB, storeMoveDoneContexts), + _bmj(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, + _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, + _diskMemUsageNotifier, blockableConfig, + "test", makeBucketSpace()), + _runner(_bmj) +{ +} + +ControllerFixtureBase::~ControllerFixtureBase() = default; +constexpr double RESOURCE_LIMIT_FACTOR = 1.0; +constexpr uint32_t MAX_OUTSTANDING_OPS = 10; +const BlockableMaintenanceJobConfig BLOCKABLE_CONFIG(RESOURCE_LIMIT_FACTOR, MAX_OUTSTANDING_OPS); + +struct ControllerFixture : public ControllerFixtureBase +{ + ControllerFixture(const BlockableMaintenanceJobConfig &blockableConfig = BLOCKABLE_CONFIG) + : ControllerFixtureBase(blockableConfig, blockableConfig.getMaxOutstandingMoveOps() != MAX_OUTSTANDING_OPS) + { + _builder.createDocs(1, 1, 4); // 3 docs + _builder.createDocs(2, 4, 6); // 2 docs + _ready.insertDocs(_builder.getDocs()); + _builder.clearDocs(); + _builder.createDocs(3, 1, 3); // 2 docs + _builder.createDocs(4, 3, 6); // 3 docs + _notReady.insertDocs(_builder.getDocs()); + } +}; + +struct OnlyReadyControllerFixture : public ControllerFixtureBase +{ + OnlyReadyControllerFixture() : ControllerFixtureBase(BLOCKABLE_CONFIG, false) + { + _builder.createDocs(1, 1, 2); // 1 docs + _builder.createDocs(2, 2, 4); // 2 docs + _builder.createDocs(3, 4, 7); // 3 docs + _builder.createDocs(4, 7, 11); // 4 docs + _ready.insertDocs(_builder.getDocs()); + } +}; + +TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so) +{ + EXPECT_TRUE(_bmj.done()); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + EXPECT_TRUE(docsMoved().empty()); + EXPECT_TRUE(bucketsModified().empty()); +} + +TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_bucket_state_says_so) +{ + // bucket 4 should be moved + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(4)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[2]); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_notReady.bucket(4), bucketsModified()[0]); +} + +TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_bucket_state_says_so) +{ + // bucket 2 should be moved + addReady(_ready.bucket(1)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); + assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} + + +TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) +{ + // bucket 2, 3, and 4 should be moved + addReady(_ready.bucket(1)); + addReady(_notReady.bucket(3)); + addReady(_notReady.bucket(4)); + + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 2)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 2)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(4u, docsMoved().size()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 2)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(6u, docsMoved().size()); + + // move bucket 4, docs 3 + EXPECT_TRUE(_bmj.scanAndMove(1,2)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(7u, docsMoved().size()); + EXPECT_EQ(3u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); + EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]); + EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]); +} + +TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_done) +{ + // bucket 4 should be moved + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(4)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + + EXPECT_FALSE(_bmj.scanAndMove(1, 1)); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_TRUE(_bmj.scanAndMove(1, 2)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); +} + + +TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_not_ready_until_being_not_active) +{ + // bucket 1 should be moved but is active + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + activateBucket(_ready.bucket(1)); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // scan all, delay active bucket 1 + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + + deactivateBucket(_ready.bucket(1)); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); // move delayed and de-activated bucket 1 + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); +} + +TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_we_change_calculator) +{ + // bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + _bmj.scanAndMove(1, 1); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + changeCalc(); // Not cancelled, bucket 1 still moving to notReady + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); + _calc->resetAsked(); + _bmj.scanAndMove(1, 1); + EXPECT_FALSE(_bmj.done()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, calcAsked().size()); + addReady(_ready.bucket(1)); + changeCalc(); // cancelled, bucket 1 no longer moving to notReady + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); + _calc->resetAsked(); + remReady(_ready.bucket(1)); + _calc->resetAsked(); + changeCalc(); // not cancelled. No active bucket move + EXPECT_EQ(4u, calcAsked().size()); + _bmj.scanAndMove(1, 1); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(2), calcAsked()[1]); + EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]); + _bmj.scanAndMove(2, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); +} + +TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_calculator_does_not_say_so) +{ + // bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + activateBucket(_ready.bucket(1)); + _bmj.scanAndMove(4, 3); // scan all, delay active bucket 1 + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + + deactivateBucket(_ready.bucket(1)); + addReady(_ready.bucket(1)); + changeCalc(); + _bmj.scanAndMove(4, 3); // consider delayed bucket 3 + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); +} + +TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired. + addReady(_ready.bucket(1)); + _bmj.recompute(); + _bmj.scanAndMove(4, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(0u, docsMoved().size()); +} + +// Technically this should never happen since a retired node is never in the ideal state, +// but test this case for the sake of completion. +TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(3)); + _bmj.recompute(); + _bmj.scanAndMove(4, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(0u, docsMoved().size()); +} + +TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_ready_even_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(3)); + _bmj.recompute(); + activateBucket(_notReady.bucket(3)); + _bmj.scanAndMove(4, 3); + EXPECT_TRUE(_bmj.done()); + sync(); + ASSERT_EQ(2u, docsMoved().size()); + assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]); + assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); +} + + +TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) +{ + EXPECT_TRUE(_bmj.done()); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + runLoop(); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_TRUE(docsMoved().empty()); + EXPECT_TRUE(bucketsModified().empty()); + addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify + EXPECT_TRUE(_bmj.done()); // move job still believes work done + sync(); + EXPECT_TRUE(bucketsModified().empty()); + _bmj.notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(bucketsModified().empty()); + sync(); + EXPECT_TRUE(bucketsModified().empty()); + runLoop(); + EXPECT_TRUE(_bmj.done()); + sync(); + + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(2u, docsMoved().size()); +} + + +struct ResourceLimitControllerFixture : public ControllerFixture +{ + ResourceLimitControllerFixture(double resourceLimitFactor = RESOURCE_LIMIT_FACTOR) : + ControllerFixture(BlockableMaintenanceJobConfig(resourceLimitFactor, MAX_OUTSTANDING_OPS)) + {} + + void testJobStopping(DiskMemUsageState blockingUsageState) { + // Bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + // Note: This depends on _bmj.run() moving max 1 documents + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've over limit + _diskMemUsageNotifier.notify(blockingUsageState); + EXPECT_TRUE(_bmj.run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've under limit + _diskMemUsageNotifier.notify(DiskMemUsageState()); + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + } + + void testJobNotStopping(DiskMemUsageState blockingUsageState) { + // Bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj.recompute(); + EXPECT_FALSE(_bmj.done()); + // Note: This depends on _bmj.run() moving max 1 documents + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've over limit, but not over adjusted limit + _diskMemUsageNotifier.notify(blockingUsageState); + EXPECT_FALSE(_bmj.run()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + } +}; + +struct ResourceLimitControllerFixture_1_2 : public ResourceLimitControllerFixture { + ResourceLimitControllerFixture_1_2() : ResourceLimitControllerFixture(1.2) {} +}; + +TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_disk_limit_is_reached) +{ + testJobStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); +} + +TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_memory_limit_is_reached) +{ + testJobStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); +} + +TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_disk_resource_limit) +{ + testJobNotStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); +} + +TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_memory_resource_limit) +{ + testJobNotStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); +} + + +struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase +{ + MaxOutstandingMoveOpsFixture(uint32_t maxOutstandingOps) + : ControllerFixtureBase(BlockableMaintenanceJobConfig(RESOURCE_LIMIT_FACTOR, maxOutstandingOps), true) + { + _builder.createDocs(1, 1, 2); + _builder.createDocs(2, 2, 3); + _builder.createDocs(3, 3, 4); + _builder.createDocs(4, 4, 5); + _ready.insertDocs(_builder.getDocs()); + _builder.clearDocs(); + _builder.createDocs(11, 1, 2); + _builder.createDocs(12, 2, 3); + _builder.createDocs(13, 3, 4); + _builder.createDocs(14, 4, 5); + _notReady.insertDocs(_builder.getDocs()); + addReady(_ready.bucket(3)); + _bmj.recompute(); + } + + void assertRunToBlocked() { + EXPECT_TRUE(_bmj.run()); // job becomes blocked as max outstanding limit is reached + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.isBlocked()); + EXPECT_TRUE(_bmj.isBlocked(BlockedReason::OUTSTANDING_OPS)); + } + void assertRunToNotBlocked() { + EXPECT_FALSE(_bmj.run()); + EXPECT_FALSE(_bmj.done()); + EXPECT_FALSE(_bmj.isBlocked()); + } + void assertRunToFinished() { + EXPECT_TRUE(_bmj.run()); + EXPECT_TRUE(_bmj.done()); + EXPECT_FALSE(_bmj.isBlocked()); + } + void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) { + EXPECT_EQ(expDocsMovedCnt, docsMoved().size()); + EXPECT_EQ(expMoveContextsCnt, _moveHandler._moveDoneContexts.size()); + } + void unblockJob(uint32_t expRunnerCnt) { + _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner + EXPECT_EQ(expRunnerCnt, _runner.runCount); + EXPECT_FALSE(_bmj.isBlocked()); + } +}; + +struct MaxOutstandingMoveOpsFixture_1 : public MaxOutstandingMoveOpsFixture { + MaxOutstandingMoveOpsFixture_1() : MaxOutstandingMoveOpsFixture(1) {} +}; + +struct MaxOutstandingMoveOpsFixture_2 : public MaxOutstandingMoveOpsFixture { + MaxOutstandingMoveOpsFixture_2() : MaxOutstandingMoveOpsFixture(2) {} +}; + +TEST_F(MaxOutstandingMoveOpsFixture_1, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_1) +{ + assertRunToBlocked(); + sync(); + assertDocsMoved(1, 1); + assertRunToBlocked(); + assertDocsMoved(1, 1); + + unblockJob(1); + assertRunToBlocked(); + sync(); + assertDocsMoved(2, 1); + + unblockJob(2); + assertRunToBlocked(); + sync(); + assertDocsMoved(3, 1); + + unblockJob(3); + assertRunToFinished(); + sync(); + assertDocsMoved(3, 0); +} + +TEST_F(MaxOutstandingMoveOpsFixture_2, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_2) +{ + assertRunToNotBlocked(); + sync(); + assertDocsMoved(1, 1); + + assertRunToBlocked(); + sync(); + assertDocsMoved(2, 2); + + unblockJob(2); + assertRunToFinished(); + sync(); + assertDocsMoved(3, 1); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp index 420f45d99e8..ae504fef603 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp @@ -43,7 +43,7 @@ struct DocumentMoverTest : ::testing::Test : _builder(), _bucketDB(std::make_shared()), _limiter(), - _mover(_limiter), + _mover(_limiter, _bucketDb), _source(_builder, _bucketDB, 0u, SubDbType::READY), _bucketDb(), _handler(_bucketDb) @@ -58,7 +58,7 @@ struct DocumentMoverTest : ::testing::Test _source._subDb.retriever(), _source._subDb.feed_view(), &_pendingLidsForCommit); - _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler, _bucketDb); + _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler); } bool moveDocuments(size_t maxDocsToMove) { return _mover.moveDocuments(maxDocsToMove); @@ -68,7 +68,7 @@ struct DocumentMoverTest : ::testing::Test TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done) { MyMoveOperationLimiter limiter; - DocumentBucketMover mover(limiter); + DocumentBucketMover mover(limiter, _bucketDb); EXPECT_TRUE(mover.bucketDone()); mover.moveDocuments(2); EXPECT_TRUE(mover.bucketDone()); diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 26fba8a780f..86d238c7554 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(searchcore_server STATIC bootstrapconfigmanager.cpp buckethandler.cpp bucketmovejob.cpp + bucketmovejobv2.cpp clusterstatehandler.cpp combiningfeedview.cpp ddbstate.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index 8b9b3c539d6..30b1e8b623c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -64,8 +64,7 @@ BucketMoveJob::checkBucket(const BucketId &bucket, const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); - mover.setupForBucket(bucket, &source, target.sub_db_id(), - _moveHandler, _ready.meta_store()->getBucketDB()); + mover.setupForBucket(bucket, &source, target.sub_db_id(), _moveHandler); } BucketMoveJob::ScanResult @@ -151,7 +150,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _modifiedHandler(modifiedHandler), _ready(ready), _notReady(notReady), - _mover(getLimiter()), + _mover(getLimiter(), _ready.meta_store()->getBucketDB()), _doneScan(false), _scanPos(), _scanPass(ScanPass::FIRST), @@ -161,7 +160,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _delayedBucketsFrozen(), _frozenBuckets(frozenBuckets), _bucketCreateNotifier(bucketCreateNotifier), - _delayedMover(getLimiter()), + _delayedMover(getLimiter(), _ready.meta_store()->getBucketDB()), _clusterStateChangedNotifier(clusterStateChangedNotifier), _bucketStateChangedNotifier(bucketStateChangedNotifier), _diskMemUsageNotifier(diskMemUsageNotifier) diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp new file mode 100644 index 00000000000..77942b72478 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -0,0 +1,368 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketmovejobv2.h" +#include "imaintenancejobrunner.h" +#include "ibucketstatechangednotifier.h" +#include "iclusterstatechangednotifier.h" +#include "maintenancedocumentsubdb.h" +#include "i_disk_mem_usage_notifier.h" +#include "ibucketmodifiedhandler.h" +#include "move_operation_limiter.h" +#include "document_db_maintenance_config.h" +#include +#include +#include +#include +#include +#include +#include + +#include +LOG_SETUP(".proton.server.bucketmovejob"); + +using document::BucketId; +using storage::spi::BucketInfo; +using storage::spi::Bucket; +using storage::spi::makeBucketTask; +using proton::bucketdb::BucketMover; +using vespalib::makeLambdaTask; + +namespace proton { + +namespace { + +const char * bool2str(bool v) { return (v ? "T" : "F"); } + +bool +blockedDueToClusterState(const IBucketStateCalculator::SP &calc) +{ + bool clusterUp = calc && calc->clusterUp(); + bool nodeUp = calc && calc->nodeUp(); + bool nodeInitializing = calc && calc->nodeInitializing(); + return !(clusterUp && nodeUp && !nodeInitializing); +} + +} + +BucketMoveJobV2::BucketMoveJobV2(const IBucketStateCalculator::SP &calc, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) + : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig), + IClusterStateChangedHandler(), + bucketdb::IBucketCreateListener(), + IBucketStateChangedHandler(), + IDiskMemUsageListener(), + _calc(calc), + _moveHandler(moveHandler), + _modifiedHandler(modifiedHandler), + _master(master), + _bucketExecutor(bucketExecutor), + _ready(ready), + _notReady(notReady), + _bucketSpace(bucketSpace), + _iterateCount(0), + _movers(), + _buckets2Move(), + _stopped(false), + _startedCount(0), + _executedCount(0), + _bucketCreateNotifier(bucketCreateNotifier), + _clusterStateChangedNotifier(clusterStateChangedNotifier), + _bucketStateChangedNotifier(bucketStateChangedNotifier), + _diskMemUsageNotifier(diskMemUsageNotifier) +{ + _movers.reserve(std::min(100u, blockableConfig.getMaxOutstandingMoveOps())); + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } + + _bucketCreateNotifier.addListener(this); + _clusterStateChangedNotifier.addClusterStateChangedHandler(this); + _bucketStateChangedNotifier.addBucketStateChangedHandler(this); + _diskMemUsageNotifier.addDiskMemUsageListener(this); + recompute(); +} + +BucketMoveJobV2::~BucketMoveJobV2() +{ + _bucketCreateNotifier.removeListener(this); + _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); + _bucketStateChangedNotifier.removeBucketStateChangedHandler(this); + _diskMemUsageNotifier.removeDiskMemUsageListener(this); +} + +BucketMoveJobV2::NeedResult +BucketMoveJobV2::needMove(const ScanIterator &itr) const { + NeedResult noMove(false, false); + const bool hasReadyDocs = itr.hasReadyBucketDocs(); + const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs(); + if (!hasReadyDocs && !hasNotReadyDocs) { + return noMove; // No documents for bucket in ready or notready subdbs + } + const bool isActive = itr.isActive(); + // No point in moving buckets when node is retired and everything will be deleted soon. + // However, allow moving of explicitly activated buckets, as this implies a lack of other good replicas. + if (!_calc || (_calc->nodeRetired() && !isActive)) { + return noMove; + } + const bool shouldBeReady = _calc->shouldBeReady(document::Bucket(_bucketSpace, itr.getBucket())); + const bool wantReady = shouldBeReady || isActive; + LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)", + itr.getBucket().toString().c_str(), bool2str(shouldBeReady), bool2str(isActive)); + if (wantReady) { + if (!hasNotReadyDocs) + return noMove; // No notready bucket to make ready + } else { + if (isActive) + return noMove; // Do not move rom ready to not ready when active + if (!hasReadyDocs) + return noMove; // No ready bucket to make notready + } + return {true, wantReady}; +} + +void +BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) { + auto [keys, done] = mover->getKeysToMove(maxDocsToMove); + if (done) { + mover->setBucketDone(); + } + if (keys.empty()) return; + if (_stopped.load(std::memory_order_relaxed)) return; + mover->updateLastValidGid(keys.back()._gid); + auto context = getLimiter().beginOperation(); + Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket())); + auto bucketTask = makeBucketTask( + [this, mover=std::move(mover), keys=std::move(keys),opsTracker=getLimiter().beginOperation()] + (const Bucket & bucket, std::shared_ptr onDone) mutable + { + assert(mover->getBucket() == bucket.getBucketId()); + using DoneContext = vespalib::KeepAlive>; + prepareMove(std::move(mover), std::move(keys), + std::make_shared(std::make_pair(std::move(opsTracker), std::move(onDone)))); + }); + auto failed = _bucketExecutor.execute(spiBucket, std::move(bucketTask)); + if (!failed) { + _startedCount.fetch_add(1, std::memory_order_relaxed); + } +} + +namespace { + +class IncOnDestruct { +public: + IncOnDestruct(std::atomic & count) : _count(count) {} + ~IncOnDestruct() { + _count.fetch_add(1, std::memory_order_relaxed); + } +private: + std::atomic & _count; +}; + +} + +void +BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector keys, IDestructorCallbackSP onDone) +{ + IncOnDestruct countGuard(_executedCount); + if (_stopped.load(std::memory_order_relaxed)) return; + auto moveOps = mover->createMoveOperations(keys); + _master.execute(makeLambdaTask([this, mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable { + completeMove(std::move(mover), std::move(moveOps), std::move(onDone)); + })); +} + +void +BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector ops, IDestructorCallbackSP onDone) { + mover->moveDocuments(std::move(ops), std::move(onDone)); + if (mover->bucketDone() && mover->inSync()) { + _modifiedHandler.notifyBucketModified(mover->getBucket()); + } +} + +void +BucketMoveJobV2::cancelMovesForBucket(BucketId bucket) { + for (auto itr = _movers.begin(); itr != _movers.end(); itr++) { + if (bucket == (*itr)->getBucket()) { + _movers.erase(itr); + backFillMovers(); + return; + } + } +} + +void +BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) +{ + ScanIterator itr(guard, bucket); + auto [mustMove, wantReady] = needMove(itr); + if (mustMove) { + _buckets2Move[bucket] = wantReady; + } else { + _buckets2Move.erase(bucket); + cancelMovesForBucket(bucket); + } + backFillMovers(); + considerRun(); +} + +void +BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) +{ + considerBucket(guard, bucket); +} + +BucketMoveJobV2::BucketSet +BucketMoveJobV2::computeBuckets2Move() +{ + BucketMoveJobV2::BucketSet toMove; + for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) { + auto [mustMove, wantReady] = needMove(itr); + if (mustMove) { + toMove[itr.getBucket()] = wantReady; + } + } + return toMove; +} + +std::shared_ptr +BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { + const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); + const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); + LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", + bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); + return std::make_shared(bucket, &source, target.sub_db_id(), _moveHandler); +} + +std::shared_ptr +BucketMoveJobV2::greedyCreateMover() { + if ( ! _buckets2Move.empty()) { + auto next = _buckets2Move.begin(); + auto mover = createMover(next->first, next->second); + _buckets2Move.erase(next->first); + return mover; + } + return {}; +} + +bool +BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { + if (done()) return true; + + // Select mover + size_t index = _iterateCount++ % _movers.size(); + const auto & mover = _movers[index]; + + //Move, or reduce movers as we are tailing off + if (!mover->bucketDone()) { + startMove(mover, maxDocsToMove); + if (mover->bucketDone()) { + auto next = greedyCreateMover(); + if (next) { + _movers[index] = next; + } else { + _movers.erase(_movers.begin() + index); + } + } + } + return done(); +} + +bool +BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { + for (size_t i(0); i < maxBuckets2Move; i++) { + moveDocs(maxDocsToMovePerBucket); + } + return isBlocked() || done(); +} + +bool +BucketMoveJobV2::done() const { + return _buckets2Move.empty() && _movers.empty() && !isBlocked(); +} + +bool +BucketMoveJobV2::run() +{ + if (isBlocked()) { + return true; // indicate work is done, since node state is bad + } + /// Returning false here will immediately post the job back on the executor. This will give a busy loop, + /// but this is considered fine as it is very rare and it will be intermingled with multiple feed operations. + if ( ! scanAndMove(1, 1) ) { + return false; + } + + if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { + return true; + } + return done(); +} + +void +BucketMoveJobV2::recompute() { + _movers.clear(); + _buckets2Move = computeBuckets2Move(); + backFillMovers(); +} + +void +BucketMoveJobV2::backFillMovers() { + // Ensure we have enough movers. + while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { + _movers.push_back(greedyCreateMover()); + } +} +void +BucketMoveJobV2::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) +{ + // Called by master write thread + _calc = newCalc; + recompute(); + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } else { + unBlock(BlockedReason::CLUSTER_STATE); + } +} + +void +BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) +{ + // Called by master write thread + considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); +} + +void +BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + internalNotifyDiskMemUsage(state); +} + +bool +BucketMoveJobV2::inSync() const { + return _executedCount == _startedCount; +} + +void +BucketMoveJobV2::onStop() { + // Called by master write thread + _stopped = true; + while ( ! inSync() ) { + std::this_thread::sleep_for(1ms); + } +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h new file mode 100644 index 00000000000..714f24b6ff6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -0,0 +1,119 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "blockable_maintenance_job.h" +#include "documentbucketmover.h" +#include "i_disk_mem_usage_listener.h" +#include "ibucketstatechangedhandler.h" +#include "iclusterstatechangedhandler.h" +#include +#include + +namespace storage::spi { struct BucketExecutor; } +namespace searchcorespi::index { struct IThreadService; } + +namespace proton { + +class BlockableMaintenanceJobConfig; +class IBucketStateChangedNotifier; +class IClusterStateChangedNotifier; +class IDiskMemUsageNotifier; +class IBucketModifiedHandler; + +namespace bucketdb { class IBucketCreateNotifier; } + +/** + * Class used to control the moving of buckets between the ready and + * not ready sub databases based on the readiness of buckets according to the cluster state. + * It will first compute the set of buckets to be moved. Then N of these buckets will be iterated in parallel and + * the documents scheduled for move. The movment will happen in 3 phases. + * 1 - Collect meta info for documents. Must happend in master thread + * 2 - Acquire bucket lock and fetch documents and very against meta data. This is done in BucketExecutor threads. + * 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved + * bucket modified notification is sent. + */ +class BucketMoveJobV2 : public BlockableMaintenanceJob, + public IClusterStateChangedHandler, + public bucketdb::IBucketCreateListener, + public IBucketStateChangedHandler, + public IDiskMemUsageListener +{ +private: + using BucketExecutor = storage::spi::BucketExecutor; + using IDestructorCallback = vespalib::IDestructorCallback; + using IDestructorCallbackSP = std::shared_ptr; + using IThreadService = searchcorespi::index::IThreadService; + using BucketId = document::BucketId; + using ScanIterator = bucketdb::ScanIterator; + using BucketSet = std::map; + using NeedResult = std::pair; + using ActiveState = storage::spi::BucketInfo::ActiveState; + using BucketMover = bucketdb::BucketMover; + using BucketMoverSP = std::shared_ptr; + using Movers = std::vector>; + using MoveKey = BucketMover::MoveKey; + using GuardedMoveOp = BucketMover::GuardedMoveOp; + std::shared_ptr _calc; + IDocumentMoveHandler &_moveHandler; + IBucketModifiedHandler &_modifiedHandler; + IThreadService &_master; + BucketExecutor &_bucketExecutor; + const MaintenanceDocumentSubDB &_ready; + const MaintenanceDocumentSubDB &_notReady; + const document::BucketSpace _bucketSpace; + size_t _iterateCount; + Movers _movers; + BucketSet _buckets2Move; + std::atomic _stopped; + std::atomic _startedCount; + std::atomic _executedCount; + + bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; + IClusterStateChangedNotifier &_clusterStateChangedNotifier; + IBucketStateChangedNotifier &_bucketStateChangedNotifier; + IDiskMemUsageNotifier &_diskMemUsageNotifier; + + void startMove(BucketMoverSP mover, size_t maxDocsToMove); + void prepareMove(BucketMoverSP mover, std::vector keysToMove, IDestructorCallbackSP context); + void completeMove(BucketMoverSP mover, std::vector keys, IDestructorCallbackSP context); + void considerBucket(const bucketdb::Guard & guard, BucketId bucket); + NeedResult needMove(const ScanIterator &itr) const; + BucketSet computeBuckets2Move(); + BucketMoverSP createMover(BucketId bucket, bool wantReady); + BucketMoverSP greedyCreateMover(); + void backFillMovers(); + void cancelMovesForBucket(BucketId bucket); + bool moveDocs(size_t maxDocsToMove); +public: + BucketMoveJobV2(const IBucketStateCalculator::SP &calc, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace); + + ~BucketMoveJobV2() override; + + bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket); + bool done() const; + void recompute(); + bool inSync() const; + + bool run() override; + void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override; + void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override; + void onStop() override; +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index 76a65df4d1a..6defd3e7037 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -20,8 +20,8 @@ namespace proton::bucketdb { typedef IDocumentMetaStore::Iterator Iterator; -MoveOperation::UP -BucketMover::createMoveOperation(const MoveKey &key) { +BucketMover::GuardedMoveOp +BucketMover::createMoveOperation(MoveKey &key) { if (_source->lidNeedsCommit(key._lid)) { return {}; } @@ -29,9 +29,10 @@ BucketMover::createMoveOperation(const MoveKey &key) { if (!doc || doc->getId().getGlobalId() != key._gid) return {}; // Failed to retrieve document, removed or changed identity BucketId bucketId = _bucket.stripUnused(); - return std::make_unique(bucketId, key._timestamp, std::move(doc), - DbDocumentId(_source->sub_db_id(), key._lid), - _targetSubDbId); + return BucketMover::GuardedMoveOp(std::make_unique(bucketId, key._timestamp, std::move(doc), + DbDocumentId(_source->sub_db_id(), key._lid), + _targetSubDbId), + std::move(key._guard)); } void @@ -39,21 +40,34 @@ BucketMover::moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone) _handler->handleMove(*moveOp, std::move(onDone)); } +BucketMover::MoveKey::MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept + : _lid(lid), + _gid(gid), + _timestamp(timestamp), + _guard(std::move(guard)) +{ } + +BucketMover::MoveKey::~MoveKey() = default; -BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, - IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept +BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source, + uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept : _source(source), _handler(&handler), - _bucketDb(&bucketDb), _bucket(bucket), _targetSubDbId(targetSubDbId), + _started(0), + _completed(0), _bucketDone(false), - _lastGid(), - _lastGidValid(false) + _lastGidValid(false), + _lastGid() { } +BucketMover::~BucketMover() { + assert(inSync()); +} + std::pair, bool> -BucketMover::getKeysToMove(size_t maxDocsToMove) const { +BucketMover::getKeysToMove(size_t maxDocsToMove) { std::pair, bool> result; Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid) : _source->meta_store()->lowerBound(_bucket)); @@ -63,7 +77,7 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const { uint32_t lid = itr.getKey().get_lid(); const RawDocumentMetaData &metaData = _source->meta_store()->getRawMetaData(lid); if (metaData.getBucketUsedBits() == _bucket.getUsedBits()) { - result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp()); + result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp(), MoveGuard(*this)); ++docsMoved; } } @@ -71,13 +85,13 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) const { return result; } -std::vector -BucketMover::createMoveOperations(const std::vector &toMove) { - std::vector successfulReads; +std::vector +BucketMover::createMoveOperations(std::vector &toMove) { + std::vector successfulReads; successfulReads.reserve(toMove.size()); - for (const MoveKey &key : toMove) { + for (MoveKey &key : toMove) { auto moveOp = createMoveOperation(key); - if (!moveOp) { + if (!moveOp.first) { break; } successfulReads.push_back(std::move(moveOp)); @@ -86,37 +100,10 @@ BucketMover::createMoveOperations(const std::vector &toMove) { } void -BucketMover::moveDocuments(std::vector moveOps, IDestructorCallbackSP onDone) { - for (auto & moveOp : moveOps) { - moveDocument(std::move(moveOp), onDone); - } -} - -bool -BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) -{ - if (_bucketDone) { - return true; - } - auto [keys, done] = getKeysToMove(maxDocsToMove); - auto moveOps = createMoveOperations(keys); - bool allOk = keys.size() == moveOps.size(); - if (done && allOk) { - setBucketDone(); - } - if (moveOps.empty()) return allOk; - - updateLastValidGid(moveOps.back()->getDocument()->getId().getGlobalId()); - +BucketMover::moveDocuments(std::vector moveOps, IDestructorCallbackSP onDone) { for (auto & moveOp : moveOps) { - // We cache the bucket for the document we are going to move to avoid getting - // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready - // sub dbs as the bucket info is not updated atomically in this case. - _bucketDb->takeGuard()->cacheBucket(moveOp->getBucketId()); - _handler->handleMove(*moveOp, limiter.beginOperation()); - _bucketDb->takeGuard()->uncacheBucket(); + moveDocument(std::move(moveOp.first), std::move(onDone)); } - return allOk; } } @@ -124,22 +111,51 @@ BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) namespace proton { using bucketdb::BucketMover; +using bucketdb::BucketDBOwner; -DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept +DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter, BucketDBOwner &bucketDb) noexcept : _limiter(limiter), + _bucketDb(&bucketDb), _impl() {} void DocumentBucketMover::setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, - uint32_t targetSubDbId, IDocumentMoveHandler &handler, bucketdb::BucketDBOwner &bucketDb) + uint32_t targetSubDbId, IDocumentMoveHandler &handler) { - _impl = std::make_unique(bucket, source, targetSubDbId, handler, bucketDb); + _impl = std::make_unique(bucket, source, targetSubDbId, handler); } bool DocumentBucketMover::moveDocuments(size_t maxDocsToMove) { - return !_impl || _impl->moveDocuments(maxDocsToMove, _limiter); + return !_impl || moveDocuments(maxDocsToMove, _limiter); +} + +bool +DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) +{ + if (_impl->bucketDone()) { + return true; + } + auto [keys, done] = _impl->getKeysToMove(maxDocsToMove); + auto moveOps = _impl->createMoveOperations(keys); + bool allOk = keys.size() == moveOps.size(); + if (done && allOk) { + _impl->setBucketDone(); + } + if (moveOps.empty()) return allOk; + + _impl->updateLastValidGid(moveOps.back().first->getDocument()->getId().getGlobalId()); + + for (auto & moveOp : moveOps) { + // We cache the bucket for the document we are going to move to avoid getting + // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready + // sub dbs as the bucket info is not updated atomically in this case. + _bucketDb->takeGuard()->cacheBucket(moveOp.first->getBucketId()); + _impl->moveDocument(std::move(moveOp.first), limiter.beginOperation()); + _bucketDb->takeGuard()->uncacheBucket(); + } + return allOk; } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index dc91d40ce4e..c4f94a88cfa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace vespalib { class IDestructorCallback; } @@ -30,40 +31,60 @@ public: using MoveOperationUP = std::unique_ptr; using IDestructorCallback = vespalib::IDestructorCallback; using IDestructorCallbackSP = std::shared_ptr; + class MoveGuard { + public: + MoveGuard() noexcept : _mover(nullptr) {} + MoveGuard(BucketMover & mover) noexcept + : _mover(&mover) + { + _mover->_started.fetch_add(1, std::memory_order_relaxed); + } + MoveGuard(MoveGuard && rhs) noexcept : _mover(rhs._mover) { rhs._mover = nullptr; } + MoveGuard & operator = (MoveGuard && mover) = delete; + MoveGuard(const MoveGuard & rhs) = delete; + MoveGuard & operator = (const MoveGuard & mover) = delete; + ~MoveGuard() { + if (_mover) { + _mover->_completed.fetch_add(1, std::memory_order_relaxed); + } + } + private: + BucketMover *_mover; + }; struct MoveKey { using Timestamp = storage::spi::Timestamp; - MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp) noexcept - : _lid(lid), - _gid(gid), - _timestamp(timestamp) - { } + MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept; + MoveKey(MoveKey &&) noexcept = default; + ~MoveKey(); uint32_t _lid; document::GlobalId _gid; Timestamp _timestamp; + MoveGuard _guard; }; - BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, - IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept; + BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, + uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept; BucketMover(BucketMover &&) noexcept = default; BucketMover & operator=(BucketMover &&) noexcept = delete; BucketMover(const BucketMover &) = delete; BucketMover & operator=(const BucketMover &) = delete; + ~BucketMover(); - // TODO remove once we have switched bucket move job - bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter); - + using GuardedMoveOp = std::pair; /// Must be called in master thread - std::pair, bool> getKeysToMove(size_t maxDocsToMove) const; + std::pair, bool> getKeysToMove(size_t maxDocsToMove); /// Call from any thread - std::vector createMoveOperations(const std::vector & toMove); + std::vector createMoveOperations(std::vector & toMove); /// Must be called in master thread - void moveDocuments(std::vector moveOps, IDestructorCallbackSP onDone); + void moveDocuments(std::vector moveOps, IDestructorCallbackSP onDone); + void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); const document::BucketId &getBucket() const { return _bucket; } void cancel() { setBucketDone(); } void setBucketDone() { _bucketDone = true; } + /// Signals all documents have been scheduled for move bool bucketDone() const { return _bucketDone; } const MaintenanceDocumentSubDB * getSource() const { return _source; } /// Must be called in master thread @@ -71,19 +92,24 @@ public: _lastGid = gid; _lastGidValid = true; } + bool inSync() const { + return pending() == 0; + } private: const MaintenanceDocumentSubDB *_source; IDocumentMoveHandler *_handler; - BucketDBOwner *_bucketDb; const document::BucketId _bucket; const uint32_t _targetSubDbId; - bool _bucketDone; - document::GlobalId _lastGid; + std::atomic _started; + std::atomic _completed; + bool _bucketDone; // All moves started, or operation has been cancelled bool _lastGidValid; - - void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); - MoveOperationUP createMoveOperation(const MoveKey & key); + document::GlobalId _lastGid; + GuardedMoveOp createMoveOperation(MoveKey & key); + size_t pending() const { + return _started.load(std::memory_order_relaxed) - _completed.load(std::memory_order_relaxed); + } }; } @@ -95,10 +121,13 @@ private: class DocumentBucketMover { private: - IMoveOperationLimiter &_limiter; - std::unique_ptr _impl; + IMoveOperationLimiter &_limiter; + bucketdb::BucketDBOwner *_bucketDb; + std::unique_ptr _impl; + + bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter); public: - DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept; + DocumentBucketMover(IMoveOperationLimiter &limiter, bucketdb::BucketDBOwner &bucketDb) noexcept; DocumentBucketMover(DocumentBucketMover &&) noexcept = default; DocumentBucketMover & operator=(DocumentBucketMover &&) noexcept = delete; DocumentBucketMover(const DocumentBucketMover &) = delete; @@ -106,8 +135,7 @@ public: void setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, - IDocumentMoveHandler &handler, - bucketdb::BucketDBOwner &bucketDb); + IDocumentMoveHandler &handler); const document::BucketId &getBucket() const { return _impl->getBucket(); } bool moveDocuments(size_t maxDocsToMove); void cancel() { _impl->cancel(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index c8429984b1f..ce6acb4795e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketmovejob.h" +#include "bucketmovejobv2.h" #include "heart_beat_job.h" #include "job_tracked_maintenance_job.h" #include "lid_space_compaction_job.h" @@ -37,8 +38,7 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller, document::BucketSpace bucketSpace) { for (auto &lidHandler : lscHandlers) { - - IMaintenanceJob::UP job; + std::unique_ptr job; if (config.getLidSpaceCompactionConfig().useBucketExecutor()) { job = std::make_unique( config.getLidSpaceCompactionConfig(), @@ -65,6 +65,7 @@ void injectBucketMoveJob(MaintenanceController &controller, const DocumentDBMaintenanceConfig &config, IFrozenBucketHandler &fbHandler, + storage::spi::BucketExecutor & bucketExecutor, bucketdb::IBucketCreateNotifier &bucketCreateNotifier, const vespalib::string &docTypeName, document::BucketSpace bucketSpace, @@ -76,18 +77,35 @@ injectBucketMoveJob(MaintenanceController &controller, DocumentDBJobTrackers &jobTrackers, IDiskMemUsageNotifier &diskMemUsageNotifier) { - auto bmj = std::make_unique(calc, - moveHandler, - bucketModifiedHandler, - controller.getReadySubDB(), - controller.getNotReadySubDB(), - fbHandler, - bucketCreateNotifier, - clusterStateChangedNotifier, - bucketStateChangedNotifier, - diskMemUsageNotifier, - config.getBlockableJobConfig(), - docTypeName, bucketSpace); + std::unique_ptr bmj; + if (config.getBucketMoveConfig().useBucketExecutor()) { + bmj = std::make_unique(calc, + moveHandler, + bucketModifiedHandler, + controller.masterThread(), + bucketExecutor, + controller.getReadySubDB(), + controller.getNotReadySubDB(), + bucketCreateNotifier, + clusterStateChangedNotifier, + bucketStateChangedNotifier, + diskMemUsageNotifier, + config.getBlockableJobConfig(), + docTypeName, bucketSpace); + } else { + bmj = std::make_unique(calc, + moveHandler, + bucketModifiedHandler, + controller.getReadySubDB(), + controller.getNotReadySubDB(), + fbHandler, + bucketCreateNotifier, + clusterStateChangedNotifier, + bucketStateChangedNotifier, + diskMemUsageNotifier, + config.getBlockableJobConfig(), + docTypeName, bucketSpace); + } controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj))); } @@ -133,9 +151,9 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, clusterStateChangedNotifier, calc, bucketSpace); } - injectBucketMoveJob(controller, config, fbHandler, bucketCreateNotifier, docTypeName, bucketSpace, moveHandler, - bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, calc, - jobTrackers, diskMemUsageNotifier); + injectBucketMoveJob(controller, config, fbHandler, bucketExecutor, bucketCreateNotifier, docTypeName, bucketSpace, + moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, + calc, jobTrackers, diskMemUsageNotifier); controller.registerJobInMasterThread( std::make_unique(readyAttributeManager, notReadyAttributeManager, -- cgit v1.2.3