From f28c243cad556de629e0648a0a0b6c441c116cb1 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 5 May 2021 07:17:10 +0000 Subject: Rename bucket move job --- .../documentdb/documentbucketmover/CMakeLists.txt | 6 +- .../documentbucketmover_test.cpp | 682 +++++++++++++++++++++ .../documentbucketmover_v2_test.cpp | 682 --------------------- .../vespa/searchcore/proton/server/CMakeLists.txt | 2 +- .../searchcore/proton/server/bucketmovejob.cpp | 462 ++++++++++++++ .../vespa/searchcore/proton/server/bucketmovejob.h | 147 +++++ .../searchcore/proton/server/bucketmovejobv2.cpp | 462 -------------- .../searchcore/proton/server/bucketmovejobv2.h | 147 ----- .../proton/server/maintenance_jobs_injector.cpp | 2 +- 9 files changed, 1296 insertions(+), 1296 deletions(-) create mode 100644 searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp delete mode 100644 searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h delete mode 100644 searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp delete mode 100644 searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h (limited to 'searchcore') diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt index 88bd84cbd16..1aa0b1c585d 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/CMakeLists.txt @@ -5,9 +5,9 @@ vespa_add_library(searchcore_bucketmover_test STATIC bucketmover_common.cpp ) -vespa_add_executable(searchcore_documentbucketmover_v2_test_app TEST +vespa_add_executable(searchcore_documentbucketmover_test_app TEST SOURCES - documentbucketmover_v2_test.cpp + documentbucketmover_test.cpp DEPENDS searchcore_bucketmover_test searchcore_test @@ -15,7 +15,7 @@ vespa_add_executable(searchcore_documentbucketmover_v2_test_app TEST searchcore_feedoperation GTest::GTest ) -vespa_add_test(NAME searchcore_documentbucketmover_v2_test_app COMMAND searchcore_documentbucketmover_v2_test_app) +vespa_add_test(NAME searchcore_documentbucketmover_test_app COMMAND searchcore_documentbucketmover_test_app) vespa_add_executable(searchcore_scaniterator_test_app TEST SOURCES diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp new file mode 100644 index 00000000000..609f2413bdc --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -0,0 +1,682 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketmover_common.h" +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +LOG_SETUP("document_bucket_mover_test"); + +using namespace proton; +using namespace proton::move::test; +using document::BucketId; +using document::test::makeBucketSpace; +using proton::bucketdb::BucketCreateNotifier; +using storage::spi::BucketInfo; +using BlockedReason = IBlockableMaintenanceJob::BlockedReason; +using MoveOperationVector = std::vector; +using storage::spi::dummy::DummyBucketExecutor; +using vespalib::ThreadStackExecutor; + +struct ControllerFixtureBase : public ::testing::Test +{ + test::UserDocumentsBuilder _builder; + test::BucketStateCalculator::SP _calc; + test::ClusterStateHandler _clusterStateHandler; + test::BucketHandler _bucketHandler; + MyBucketModifiedHandler _modifiedHandler; + std::shared_ptr _bucketDB; + MySubDb _ready; + MySubDb _notReady; + BucketCreateNotifier _bucketCreateNotifier; + test::DiskMemUsageNotifier _diskMemUsageNotifier; + MonitoredRefCount _refCount; + ThreadStackExecutor _singleExecutor; + ExecutorThreadService _master; + DummyBucketExecutor _bucketExecutor; + MyMoveHandler _moveHandler; + DocumentDBTaggedMetrics _metrics; + std::shared_ptr _bmj; + MyCountJobRunner _runner; + ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); + ~ControllerFixtureBase(); + ControllerFixtureBase &addReady(const BucketId &bucket) { + _calc->addReady(bucket); + return *this; + } + ControllerFixtureBase &remReady(const BucketId &bucket) { + _calc->remReady(bucket); + return *this; + } + ControllerFixtureBase &changeCalc() { + _calc->resetAsked(); + _moveHandler.reset(); + _modifiedHandler.reset(); + _clusterStateHandler.notifyClusterStateChanged(_calc); + return *this; + } + ControllerFixtureBase &activateBucket(const BucketId &bucket) { + _ready.setBucketState(bucket, true); + _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::ACTIVE); + return *this; + } + ControllerFixtureBase &deactivateBucket(const BucketId &bucket) { + _ready.setBucketState(bucket, false); + _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE); + return *this; + } + void failRetrieveForLid(uint32_t lid) { + _ready.failRetrieveForLid(lid); + _notReady.failRetrieveForLid(lid); + } + void fixRetriever() { + _ready.failRetrieveForLid(0); + _notReady.failRetrieveForLid(0); + } + const MoveOperationVector &docsMoved() const { + return _moveHandler._moves; + } + const std::vector &bucketsModified() const { + return _modifiedHandler._modified; + } + const BucketId::List &calcAsked() const { + return _calc->asked(); + } + size_t numPending() { + _bmj->updateMetrics(_metrics); + return _metrics.bucketMove.bucketsPending.getLast(); + } + void runLoop() { + while (!_bmj->isBlocked() && !_bmj->run()) { + } + } + void sync() { + _bucketExecutor.sync(); + _master.sync(); + _master.sync(); // Handle that master schedules onto master again + } + template + void masterExecute(FunctionType &&function) { + _master.execute(vespalib::makeLambdaTask(std::forward(function))); + _master.sync(); + } +}; + +ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts) + : _builder(), + _calc(std::make_shared()), + _bucketHandler(), + _modifiedHandler(), + _bucketDB(std::make_shared()), + _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY), + _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY), + _bucketCreateNotifier(), + _diskMemUsageNotifier(), + _refCount(), + _singleExecutor(1, 0x10000), + _master(_singleExecutor), + _bucketExecutor(4), + _moveHandler(*_bucketDB, storeMoveDoneContexts), + _metrics("test", 1), + _bmj(BucketMoveJobV2::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, + _notReady._subDb, _bucketCreateNotifier,_clusterStateHandler, _bucketHandler, + _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())), + _runner(*_bmj) +{ +} + +ControllerFixtureBase::~ControllerFixtureBase() = default; +constexpr double RESOURCE_LIMIT_FACTOR = 1.0; +constexpr uint32_t MAX_OUTSTANDING_OPS = 10; +const BlockableMaintenanceJobConfig BLOCKABLE_CONFIG(RESOURCE_LIMIT_FACTOR, MAX_OUTSTANDING_OPS); + +struct ControllerFixture : public ControllerFixtureBase +{ + ControllerFixture(const BlockableMaintenanceJobConfig &blockableConfig = BLOCKABLE_CONFIG) + : ControllerFixtureBase(blockableConfig, blockableConfig.getMaxOutstandingMoveOps() != MAX_OUTSTANDING_OPS) + { + _builder.createDocs(1, 1, 4); // 3 docs + _builder.createDocs(2, 4, 6); // 2 docs + _ready.insertDocs(_builder.getDocs()); + _builder.clearDocs(); + _builder.createDocs(3, 1, 3); // 2 docs + _builder.createDocs(4, 3, 6); // 3 docs + _notReady.insertDocs(_builder.getDocs()); + } +}; + +struct OnlyReadyControllerFixture : public ControllerFixtureBase +{ + OnlyReadyControllerFixture() : ControllerFixtureBase(BLOCKABLE_CONFIG, false) + { + _builder.createDocs(1, 1, 2); // 1 docs + _builder.createDocs(2, 2, 4); // 2 docs + _builder.createDocs(3, 4, 7); // 3 docs + _builder.createDocs(4, 7, 11); // 4 docs + _ready.insertDocs(_builder.getDocs()); + } +}; + +TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so) +{ + EXPECT_TRUE(_bmj->done()); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + _bmj->recompute(); + masterExecute([this]() { + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); + }); + EXPECT_TRUE(docsMoved().empty()); + EXPECT_TRUE(bucketsModified().empty()); +} + +TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_bucket_state_says_so) +{ + // bucket 4 should be moved + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(4)); + + EXPECT_EQ(0, numPending()); + _bmj->recompute(); + EXPECT_EQ(1, numPending()); + masterExecute([this]() { + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(0, numPending()); + EXPECT_EQ(3u, docsMoved().size()); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]); + assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[2]); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_notReady.bucket(4), bucketsModified()[0]); +} + +TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_bucket_state_says_so) +{ + // bucket 2 should be moved + addReady(_ready.bucket(1)); + _bmj->recompute(); + masterExecute([this]() { + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); + assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} + +TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error) +{ + // bucket 2 should be moved + addReady(_ready.bucket(1)); + _bmj->recompute(); + failRetrieveForLid(5); + masterExecute([this]() { + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_FALSE(_bmj->done()); + fixRetriever(); + masterExecute([this]() { + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); + assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} + + +TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) +{ + // bucket 2, 3, and 4 should be moved + addReady(_ready.bucket(1)); + addReady(_notReady.bucket(3)); + addReady(_notReady.bucket(4)); + + _bmj->recompute(); + EXPECT_EQ(3, numPending()); + masterExecute([this]() { + EXPECT_FALSE(_bmj->done()); + + EXPECT_FALSE(_bmj->scanAndMove(1, 2)); + EXPECT_FALSE(_bmj->done()); + }); + sync(); + EXPECT_EQ(2, numPending()); + EXPECT_EQ(2u, docsMoved().size()); + + masterExecute([this]() { + EXPECT_FALSE(_bmj->scanAndMove(1, 2)); + EXPECT_FALSE(_bmj->done()); + }); + sync(); + EXPECT_EQ(2, numPending()); + EXPECT_EQ(4u, docsMoved().size()); + + masterExecute([this]() { + EXPECT_FALSE(_bmj->scanAndMove(1, 2)); + EXPECT_FALSE(_bmj->done()); + }); + sync(); + EXPECT_EQ(1, numPending()); + EXPECT_EQ(6u, docsMoved().size()); + + // move bucket 4, docs 3 + masterExecute([this]() { + EXPECT_TRUE(_bmj->scanAndMove(1, 2)); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(0, numPending()); + EXPECT_EQ(7u, docsMoved().size()); + EXPECT_EQ(3u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); + EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]); + EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]); +} + +TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_done) +{ + // bucket 4 should be moved + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(4)); + _bmj->recompute(); + masterExecute([this]() { + EXPECT_FALSE(_bmj->done()); + + EXPECT_FALSE(_bmj->scanAndMove(1, 1)); + EXPECT_FALSE(_bmj->done()); + }); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + masterExecute([this]() { + EXPECT_TRUE(_bmj->scanAndMove(1, 2)); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); +} + + +TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_not_ready_until_being_not_active) +{ + // bucket 1 should be moved but is active + addReady(_ready.bucket(2)); + _bmj->recompute(); + EXPECT_FALSE(_bmj->done()); + activateBucket(_ready.bucket(1)); + masterExecute([this]() { + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // scan all, delay active bucket 1 + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + + deactivateBucket(_ready.bucket(1)); + masterExecute([this]() { + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // move delayed and de-activated bucket 1 + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); +} + +TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_we_change_calculator) +{ + // bucket 1 should be moved + addReady(_ready.bucket(2)); + + masterExecute([this]() { + _bmj->recompute(); + _bmj->scanAndMove(1, 1); + EXPECT_FALSE(_bmj->done()); + }); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + masterExecute([this]() { + changeCalc(); // Not cancelled, bucket 1 still moving to notReady + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); + _calc->resetAsked(); + _bmj->scanAndMove(1, 1); + EXPECT_FALSE(_bmj->done()); + }); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, calcAsked().size()); + addReady(_ready.bucket(1)); + masterExecute([this]() { + changeCalc(); // cancelled, bucket 1 no longer moving to notReady + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); + _calc->resetAsked(); + remReady(_ready.bucket(1)); + _calc->resetAsked(); + changeCalc(); // not cancelled. No active bucket move + EXPECT_EQ(4u, calcAsked().size()); + _bmj->scanAndMove(1, 1); + }); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(2), calcAsked()[1]); + EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]); + masterExecute([this]() { + _bmj->scanAndMove(2, 3); + }); + EXPECT_TRUE(_bmj->done()); + sync(); + EXPECT_EQ(3u, docsMoved().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); +} + +TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_calculator_does_not_say_so) +{ + // bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj->recompute(); + masterExecute([this]() { + activateBucket(_ready.bucket(1)); + _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1 + }); + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + + masterExecute([this]() { + deactivateBucket(_ready.bucket(1)); + addReady(_ready.bucket(1)); + changeCalc(); + _bmj->scanAndMove(4, 3); // consider delayed bucket 3 + }); + sync(); + EXPECT_EQ(0u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + EXPECT_EQ(4u, calcAsked().size()); + EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); +} + +TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired. + addReady(_ready.bucket(1)); + masterExecute([this]() { + _bmj->recompute(); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(0u, docsMoved().size()); +} + +// Technically this should never happen since a retired node is never in the ideal state, +// but test this case for the sake of completion. +TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(3)); + masterExecute([this]() { + _bmj->recompute(); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + EXPECT_EQ(0u, docsMoved().size()); +} + +TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_ready_even_if_node_is_marked_as_retired) +{ + _calc->setNodeRetired(true); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + addReady(_notReady.bucket(3)); + _bmj->recompute(); + masterExecute([this]() { + activateBucket(_notReady.bucket(3)); + _bmj->scanAndMove(4, 3); + EXPECT_TRUE(_bmj->done()); + }); + sync(); + ASSERT_EQ(2u, docsMoved().size()); + assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]); + assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); +} + + +TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) +{ + EXPECT_TRUE(_bmj->done()); + addReady(_ready.bucket(1)); + addReady(_ready.bucket(2)); + runLoop(); + EXPECT_TRUE(_bmj->done()); + sync(); + EXPECT_TRUE(docsMoved().empty()); + EXPECT_TRUE(bucketsModified().empty()); + addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify + EXPECT_TRUE(_bmj->done()); // move job still believes work done + sync(); + EXPECT_TRUE(bucketsModified().empty()); + masterExecute([this]() { + _bmj->notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(bucketsModified().empty()); + }); + sync(); + EXPECT_TRUE(bucketsModified().empty()); + runLoop(); + EXPECT_TRUE(_bmj->done()); + sync(); + + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(2u, docsMoved().size()); +} + + +struct ResourceLimitControllerFixture : public ControllerFixture +{ + ResourceLimitControllerFixture(double resourceLimitFactor = RESOURCE_LIMIT_FACTOR) : + ControllerFixture(BlockableMaintenanceJobConfig(resourceLimitFactor, MAX_OUTSTANDING_OPS)) + {} + + void testJobStopping(DiskMemUsageState blockingUsageState) { + // Bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj->recompute(); + EXPECT_FALSE(_bmj->done()); + // Note: This depends on _bmj->run() moving max 1 documents + EXPECT_FALSE(_bmj->run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've over limit + _diskMemUsageNotifier.notify(blockingUsageState); + EXPECT_TRUE(_bmj->run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've under limit + _diskMemUsageNotifier.notify(DiskMemUsageState()); + EXPECT_FALSE(_bmj->run()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + } + + void testJobNotStopping(DiskMemUsageState blockingUsageState) { + // Bucket 1 should be moved + addReady(_ready.bucket(2)); + _bmj->recompute(); + EXPECT_FALSE(_bmj->done()); + // Note: This depends on _bmj->run() moving max 1 documents + EXPECT_FALSE(_bmj->run()); + sync(); + EXPECT_EQ(1u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + // Notify that we've over limit, but not over adjusted limit + _diskMemUsageNotifier.notify(blockingUsageState); + EXPECT_FALSE(_bmj->run()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + EXPECT_EQ(0u, bucketsModified().size()); + } +}; + +struct ResourceLimitControllerFixture_1_2 : public ResourceLimitControllerFixture { + ResourceLimitControllerFixture_1_2() : ResourceLimitControllerFixture(1.2) {} +}; + +TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_disk_limit_is_reached) +{ + testJobStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); +} + +TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_memory_limit_is_reached) +{ + testJobStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); +} + +TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_disk_resource_limit) +{ + testJobNotStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); +} + +TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_memory_resource_limit) +{ + testJobNotStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); +} + + +struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase +{ + MaxOutstandingMoveOpsFixture(uint32_t maxOutstandingOps) + : ControllerFixtureBase(BlockableMaintenanceJobConfig(RESOURCE_LIMIT_FACTOR, maxOutstandingOps), true) + { + _builder.createDocs(1, 1, 2); + _builder.createDocs(2, 2, 3); + _builder.createDocs(3, 3, 4); + _builder.createDocs(4, 4, 5); + _ready.insertDocs(_builder.getDocs()); + _builder.clearDocs(); + _builder.createDocs(11, 1, 2); + _builder.createDocs(12, 2, 3); + _builder.createDocs(13, 3, 4); + _builder.createDocs(14, 4, 5); + _notReady.insertDocs(_builder.getDocs()); + addReady(_ready.bucket(3)); + _bmj->recompute(); + } + + void assertRunToBlocked() { + EXPECT_TRUE(_bmj->run()); // job becomes blocked as max outstanding limit is reached + EXPECT_FALSE(_bmj->done()); + EXPECT_TRUE(_bmj->isBlocked()); + EXPECT_TRUE(_bmj->isBlocked(BlockedReason::OUTSTANDING_OPS)); + } + void assertRunToNotBlocked() { + EXPECT_FALSE(_bmj->run()); + EXPECT_FALSE(_bmj->done()); + EXPECT_FALSE(_bmj->isBlocked()); + } + void assertRunToFinished() { + EXPECT_TRUE(_bmj->run()); + EXPECT_TRUE(_bmj->done()); + EXPECT_FALSE(_bmj->isBlocked()); + } + void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) { + EXPECT_EQ(expDocsMovedCnt, docsMoved().size()); + EXPECT_EQ(expMoveContextsCnt, _moveHandler._moveDoneContexts.size()); + } + void unblockJob(uint32_t expRunnerCnt) { + _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner + EXPECT_EQ(expRunnerCnt, _runner.runCount); + EXPECT_FALSE(_bmj->isBlocked()); + } +}; + +struct MaxOutstandingMoveOpsFixture_1 : public MaxOutstandingMoveOpsFixture { + MaxOutstandingMoveOpsFixture_1() : MaxOutstandingMoveOpsFixture(1) {} +}; + +struct MaxOutstandingMoveOpsFixture_2 : public MaxOutstandingMoveOpsFixture { + MaxOutstandingMoveOpsFixture_2() : MaxOutstandingMoveOpsFixture(2) {} +}; + +TEST_F(MaxOutstandingMoveOpsFixture_1, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_1) +{ + assertRunToBlocked(); + sync(); + assertDocsMoved(1, 1); + assertRunToBlocked(); + assertDocsMoved(1, 1); + + unblockJob(1); + assertRunToBlocked(); + sync(); + assertDocsMoved(2, 1); + + unblockJob(2); + assertRunToBlocked(); + sync(); + assertDocsMoved(3, 1); + + unblockJob(3); + assertRunToFinished(); + sync(); + assertDocsMoved(3, 0); +} + +TEST_F(MaxOutstandingMoveOpsFixture_2, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_2) +{ + assertRunToNotBlocked(); + sync(); + assertDocsMoved(1, 1); + + assertRunToBlocked(); + sync(); + assertDocsMoved(2, 2); + + unblockJob(1); + assertRunToFinished(); + sync(); + assertDocsMoved(3, 1); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp deleted file mode 100644 index 8dcad91f69a..00000000000 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp +++ /dev/null @@ -1,682 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bucketmover_common.h" -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -LOG_SETUP("document_bucket_mover_test"); - -using namespace proton; -using namespace proton::move::test; -using document::BucketId; -using document::test::makeBucketSpace; -using proton::bucketdb::BucketCreateNotifier; -using storage::spi::BucketInfo; -using BlockedReason = IBlockableMaintenanceJob::BlockedReason; -using MoveOperationVector = std::vector; -using storage::spi::dummy::DummyBucketExecutor; -using vespalib::ThreadStackExecutor; - -struct ControllerFixtureBase : public ::testing::Test -{ - test::UserDocumentsBuilder _builder; - test::BucketStateCalculator::SP _calc; - test::ClusterStateHandler _clusterStateHandler; - test::BucketHandler _bucketHandler; - MyBucketModifiedHandler _modifiedHandler; - std::shared_ptr _bucketDB; - MySubDb _ready; - MySubDb _notReady; - BucketCreateNotifier _bucketCreateNotifier; - test::DiskMemUsageNotifier _diskMemUsageNotifier; - MonitoredRefCount _refCount; - ThreadStackExecutor _singleExecutor; - ExecutorThreadService _master; - DummyBucketExecutor _bucketExecutor; - MyMoveHandler _moveHandler; - DocumentDBTaggedMetrics _metrics; - std::shared_ptr _bmj; - MyCountJobRunner _runner; - ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); - ~ControllerFixtureBase(); - ControllerFixtureBase &addReady(const BucketId &bucket) { - _calc->addReady(bucket); - return *this; - } - ControllerFixtureBase &remReady(const BucketId &bucket) { - _calc->remReady(bucket); - return *this; - } - ControllerFixtureBase &changeCalc() { - _calc->resetAsked(); - _moveHandler.reset(); - _modifiedHandler.reset(); - _clusterStateHandler.notifyClusterStateChanged(_calc); - return *this; - } - ControllerFixtureBase &activateBucket(const BucketId &bucket) { - _ready.setBucketState(bucket, true); - _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::ACTIVE); - return *this; - } - ControllerFixtureBase &deactivateBucket(const BucketId &bucket) { - _ready.setBucketState(bucket, false); - _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE); - return *this; - } - void failRetrieveForLid(uint32_t lid) { - _ready.failRetrieveForLid(lid); - _notReady.failRetrieveForLid(lid); - } - void fixRetriever() { - _ready.failRetrieveForLid(0); - _notReady.failRetrieveForLid(0); - } - const MoveOperationVector &docsMoved() const { - return _moveHandler._moves; - } - const std::vector &bucketsModified() const { - return _modifiedHandler._modified; - } - const BucketId::List &calcAsked() const { - return _calc->asked(); - } - size_t numPending() { - _bmj->updateMetrics(_metrics); - return _metrics.bucketMove.bucketsPending.getLast(); - } - void runLoop() { - while (!_bmj->isBlocked() && !_bmj->run()) { - } - } - void sync() { - _bucketExecutor.sync(); - _master.sync(); - _master.sync(); // Handle that master schedules onto master again - } - template - void masterExecute(FunctionType &&function) { - _master.execute(vespalib::makeLambdaTask(std::forward(function))); - _master.sync(); - } -}; - -ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts) - : _builder(), - _calc(std::make_shared()), - _bucketHandler(), - _modifiedHandler(), - _bucketDB(std::make_shared()), - _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY), - _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY), - _bucketCreateNotifier(), - _diskMemUsageNotifier(), - _refCount(), - _singleExecutor(1, 0x10000), - _master(_singleExecutor), - _bucketExecutor(4), - _moveHandler(*_bucketDB, storeMoveDoneContexts), - _metrics("test", 1), - _bmj(BucketMoveJobV2::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, - _notReady._subDb, _bucketCreateNotifier,_clusterStateHandler, _bucketHandler, - _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())), - _runner(*_bmj) -{ -} - -ControllerFixtureBase::~ControllerFixtureBase() = default; -constexpr double RESOURCE_LIMIT_FACTOR = 1.0; -constexpr uint32_t MAX_OUTSTANDING_OPS = 10; -const BlockableMaintenanceJobConfig BLOCKABLE_CONFIG(RESOURCE_LIMIT_FACTOR, MAX_OUTSTANDING_OPS); - -struct ControllerFixture : public ControllerFixtureBase -{ - ControllerFixture(const BlockableMaintenanceJobConfig &blockableConfig = BLOCKABLE_CONFIG) - : ControllerFixtureBase(blockableConfig, blockableConfig.getMaxOutstandingMoveOps() != MAX_OUTSTANDING_OPS) - { - _builder.createDocs(1, 1, 4); // 3 docs - _builder.createDocs(2, 4, 6); // 2 docs - _ready.insertDocs(_builder.getDocs()); - _builder.clearDocs(); - _builder.createDocs(3, 1, 3); // 2 docs - _builder.createDocs(4, 3, 6); // 3 docs - _notReady.insertDocs(_builder.getDocs()); - } -}; - -struct OnlyReadyControllerFixture : public ControllerFixtureBase -{ - OnlyReadyControllerFixture() : ControllerFixtureBase(BLOCKABLE_CONFIG, false) - { - _builder.createDocs(1, 1, 2); // 1 docs - _builder.createDocs(2, 2, 4); // 2 docs - _builder.createDocs(3, 4, 7); // 3 docs - _builder.createDocs(4, 7, 11); // 4 docs - _ready.insertDocs(_builder.getDocs()); - } -}; - -TEST_F(ControllerFixture, require_that_nothing_is_moved_if_bucket_state_says_so) -{ - EXPECT_TRUE(_bmj->done()); - addReady(_ready.bucket(1)); - addReady(_ready.bucket(2)); - _bmj->recompute(); - masterExecute([this]() { - EXPECT_TRUE(_bmj->scanAndMove(4, 3)); - EXPECT_TRUE(_bmj->done()); - }); - EXPECT_TRUE(docsMoved().empty()); - EXPECT_TRUE(bucketsModified().empty()); -} - -TEST_F(ControllerFixture, require_that_not_ready_bucket_is_moved_to_ready_if_bucket_state_says_so) -{ - // bucket 4 should be moved - addReady(_ready.bucket(1)); - addReady(_ready.bucket(2)); - addReady(_notReady.bucket(4)); - - EXPECT_EQ(0, numPending()); - _bmj->recompute(); - EXPECT_EQ(1, numPending()); - masterExecute([this]() { - EXPECT_FALSE(_bmj->done()); - EXPECT_TRUE(_bmj->scanAndMove(4, 3)); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(0, numPending()); - EXPECT_EQ(3u, docsMoved().size()); - assertEqual(_notReady.bucket(4), _notReady.docs(4)[0], 2, 1, docsMoved()[0]); - assertEqual(_notReady.bucket(4), _notReady.docs(4)[1], 2, 1, docsMoved()[1]); - assertEqual(_notReady.bucket(4), _notReady.docs(4)[2], 2, 1, docsMoved()[2]); - ASSERT_EQ(1u, bucketsModified().size()); - EXPECT_EQ(_notReady.bucket(4), bucketsModified()[0]); -} - -TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_bucket_state_says_so) -{ - // bucket 2 should be moved - addReady(_ready.bucket(1)); - _bmj->recompute(); - masterExecute([this]() { - EXPECT_FALSE(_bmj->done()); - EXPECT_TRUE(_bmj->scanAndMove(4, 3)); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(2u, docsMoved().size()); - assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); - assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); - EXPECT_EQ(1u, bucketsModified().size()); - EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); -} - -TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error) -{ - // bucket 2 should be moved - addReady(_ready.bucket(1)); - _bmj->recompute(); - failRetrieveForLid(5); - masterExecute([this]() { - EXPECT_FALSE(_bmj->done()); - EXPECT_TRUE(_bmj->scanAndMove(4, 3)); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_FALSE(_bmj->done()); - fixRetriever(); - masterExecute([this]() { - EXPECT_TRUE(_bmj->scanAndMove(4, 3)); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(2u, docsMoved().size()); - assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); - assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); - EXPECT_EQ(1u, bucketsModified().size()); - EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); -} - - -TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) -{ - // bucket 2, 3, and 4 should be moved - addReady(_ready.bucket(1)); - addReady(_notReady.bucket(3)); - addReady(_notReady.bucket(4)); - - _bmj->recompute(); - EXPECT_EQ(3, numPending()); - masterExecute([this]() { - EXPECT_FALSE(_bmj->done()); - - EXPECT_FALSE(_bmj->scanAndMove(1, 2)); - EXPECT_FALSE(_bmj->done()); - }); - sync(); - EXPECT_EQ(2, numPending()); - EXPECT_EQ(2u, docsMoved().size()); - - masterExecute([this]() { - EXPECT_FALSE(_bmj->scanAndMove(1, 2)); - EXPECT_FALSE(_bmj->done()); - }); - sync(); - EXPECT_EQ(2, numPending()); - EXPECT_EQ(4u, docsMoved().size()); - - masterExecute([this]() { - EXPECT_FALSE(_bmj->scanAndMove(1, 2)); - EXPECT_FALSE(_bmj->done()); - }); - sync(); - EXPECT_EQ(1, numPending()); - EXPECT_EQ(6u, docsMoved().size()); - - // move bucket 4, docs 3 - masterExecute([this]() { - EXPECT_TRUE(_bmj->scanAndMove(1, 2)); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(0, numPending()); - EXPECT_EQ(7u, docsMoved().size()); - EXPECT_EQ(3u, bucketsModified().size()); - EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); - EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]); - EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]); -} - -TEST_F(ControllerFixture, require_that_last_bucket_is_moved_before_reporting_done) -{ - // bucket 4 should be moved - addReady(_ready.bucket(1)); - addReady(_ready.bucket(2)); - addReady(_notReady.bucket(4)); - _bmj->recompute(); - masterExecute([this]() { - EXPECT_FALSE(_bmj->done()); - - EXPECT_FALSE(_bmj->scanAndMove(1, 1)); - EXPECT_FALSE(_bmj->done()); - }); - sync(); - EXPECT_EQ(1u, docsMoved().size()); - EXPECT_EQ(4u, calcAsked().size()); - masterExecute([this]() { - EXPECT_TRUE(_bmj->scanAndMove(1, 2)); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(3u, docsMoved().size()); - EXPECT_EQ(4u, calcAsked().size()); -} - - -TEST_F(ControllerFixture, require_that_active_bucket_is_not_moved_from_ready_to_not_ready_until_being_not_active) -{ - // bucket 1 should be moved but is active - addReady(_ready.bucket(2)); - _bmj->recompute(); - EXPECT_FALSE(_bmj->done()); - activateBucket(_ready.bucket(1)); - masterExecute([this]() { - EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // scan all, delay active bucket 1 - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(0u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - - deactivateBucket(_ready.bucket(1)); - masterExecute([this]() { - EXPECT_FALSE(_bmj->done()); - EXPECT_TRUE(_bmj->scanAndMove(4, 3)); // move delayed and de-activated bucket 1 - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(3u, docsMoved().size()); - EXPECT_EQ(1u, bucketsModified().size()); - EXPECT_EQ(_ready.bucket(1), bucketsModified()[0]); -} - -TEST_F(ControllerFixture, require_that_current_bucket_moving_is_cancelled_when_we_change_calculator) -{ - // bucket 1 should be moved - addReady(_ready.bucket(2)); - - masterExecute([this]() { - _bmj->recompute(); - _bmj->scanAndMove(1, 1); - EXPECT_FALSE(_bmj->done()); - }); - sync(); - EXPECT_EQ(1u, docsMoved().size()); - EXPECT_EQ(4u, calcAsked().size()); - masterExecute([this]() { - changeCalc(); // Not cancelled, bucket 1 still moving to notReady - EXPECT_EQ(4u, calcAsked().size()); - EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); - _calc->resetAsked(); - _bmj->scanAndMove(1, 1); - EXPECT_FALSE(_bmj->done()); - }); - sync(); - EXPECT_EQ(1u, docsMoved().size()); - EXPECT_EQ(0u, calcAsked().size()); - addReady(_ready.bucket(1)); - masterExecute([this]() { - changeCalc(); // cancelled, bucket 1 no longer moving to notReady - EXPECT_EQ(4u, calcAsked().size()); - EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); - _calc->resetAsked(); - remReady(_ready.bucket(1)); - _calc->resetAsked(); - changeCalc(); // not cancelled. No active bucket move - EXPECT_EQ(4u, calcAsked().size()); - _bmj->scanAndMove(1, 1); - }); - sync(); - EXPECT_EQ(1u, docsMoved().size()); - EXPECT_EQ(4u, calcAsked().size()); - EXPECT_EQ(_ready.bucket(2), calcAsked()[1]); - EXPECT_EQ(_notReady.bucket(3), calcAsked()[2]); - masterExecute([this]() { - _bmj->scanAndMove(2, 3); - }); - EXPECT_TRUE(_bmj->done()); - sync(); - EXPECT_EQ(3u, docsMoved().size()); - EXPECT_EQ(4u, calcAsked().size()); - EXPECT_EQ(_notReady.bucket(4), calcAsked()[3]); - EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); -} - -TEST_F(ControllerFixture, require_that_de_activated_bucket_is_not_moved_if_new_calculator_does_not_say_so) -{ - // bucket 1 should be moved - addReady(_ready.bucket(2)); - _bmj->recompute(); - masterExecute([this]() { - activateBucket(_ready.bucket(1)); - _bmj->scanAndMove(4, 3); // scan all, delay active bucket 1 - }); - sync(); - EXPECT_EQ(0u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - - masterExecute([this]() { - deactivateBucket(_ready.bucket(1)); - addReady(_ready.bucket(1)); - changeCalc(); - _bmj->scanAndMove(4, 3); // consider delayed bucket 3 - }); - sync(); - EXPECT_EQ(0u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - EXPECT_EQ(4u, calcAsked().size()); - EXPECT_EQ(_ready.bucket(1), calcAsked()[0]); -} - -TEST_F(ControllerFixture, ready_bucket_not_moved_to_not_ready_if_node_is_marked_as_retired) -{ - _calc->setNodeRetired(true); - // Bucket 2 would be moved from ready to not ready in a non-retired case, but not when retired. - addReady(_ready.bucket(1)); - masterExecute([this]() { - _bmj->recompute(); - _bmj->scanAndMove(4, 3); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(0u, docsMoved().size()); -} - -// Technically this should never happen since a retired node is never in the ideal state, -// but test this case for the sake of completion. -TEST_F(ControllerFixture, inactive_not_ready_bucket_not_moved_to_ready_if_node_is_marked_as_retired) -{ - _calc->setNodeRetired(true); - addReady(_ready.bucket(1)); - addReady(_ready.bucket(2)); - addReady(_notReady.bucket(3)); - masterExecute([this]() { - _bmj->recompute(); - _bmj->scanAndMove(4, 3); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - EXPECT_EQ(0u, docsMoved().size()); -} - -TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_ready_even_if_node_is_marked_as_retired) -{ - _calc->setNodeRetired(true); - addReady(_ready.bucket(1)); - addReady(_ready.bucket(2)); - addReady(_notReady.bucket(3)); - _bmj->recompute(); - masterExecute([this]() { - activateBucket(_notReady.bucket(3)); - _bmj->scanAndMove(4, 3); - EXPECT_TRUE(_bmj->done()); - }); - sync(); - ASSERT_EQ(2u, docsMoved().size()); - assertEqual(_notReady.bucket(3), _notReady.docs(3)[0], 2, 1, docsMoved()[0]); - assertEqual(_notReady.bucket(3), _notReady.docs(3)[1], 2, 1, docsMoved()[1]); - ASSERT_EQ(1u, bucketsModified().size()); - EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); -} - - -TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) -{ - EXPECT_TRUE(_bmj->done()); - addReady(_ready.bucket(1)); - addReady(_ready.bucket(2)); - runLoop(); - EXPECT_TRUE(_bmj->done()); - sync(); - EXPECT_TRUE(docsMoved().empty()); - EXPECT_TRUE(bucketsModified().empty()); - addReady(_notReady.bucket(3)); // bucket 3 now ready, no notify - EXPECT_TRUE(_bmj->done()); // move job still believes work done - sync(); - EXPECT_TRUE(bucketsModified().empty()); - masterExecute([this]() { - _bmj->notifyCreateBucket(_bucketDB->takeGuard(), _notReady.bucket(3)); // reconsider bucket 3 - EXPECT_FALSE(_bmj->done()); - EXPECT_TRUE(bucketsModified().empty()); - }); - sync(); - EXPECT_TRUE(bucketsModified().empty()); - runLoop(); - EXPECT_TRUE(_bmj->done()); - sync(); - - EXPECT_EQ(1u, bucketsModified().size()); - EXPECT_EQ(2u, docsMoved().size()); -} - - -struct ResourceLimitControllerFixture : public ControllerFixture -{ - ResourceLimitControllerFixture(double resourceLimitFactor = RESOURCE_LIMIT_FACTOR) : - ControllerFixture(BlockableMaintenanceJobConfig(resourceLimitFactor, MAX_OUTSTANDING_OPS)) - {} - - void testJobStopping(DiskMemUsageState blockingUsageState) { - // Bucket 1 should be moved - addReady(_ready.bucket(2)); - _bmj->recompute(); - EXPECT_FALSE(_bmj->done()); - // Note: This depends on _bmj->run() moving max 1 documents - EXPECT_FALSE(_bmj->run()); - sync(); - EXPECT_EQ(1u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - // Notify that we've over limit - _diskMemUsageNotifier.notify(blockingUsageState); - EXPECT_TRUE(_bmj->run()); - sync(); - EXPECT_EQ(1u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - // Notify that we've under limit - _diskMemUsageNotifier.notify(DiskMemUsageState()); - EXPECT_FALSE(_bmj->run()); - sync(); - EXPECT_EQ(2u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - } - - void testJobNotStopping(DiskMemUsageState blockingUsageState) { - // Bucket 1 should be moved - addReady(_ready.bucket(2)); - _bmj->recompute(); - EXPECT_FALSE(_bmj->done()); - // Note: This depends on _bmj->run() moving max 1 documents - EXPECT_FALSE(_bmj->run()); - sync(); - EXPECT_EQ(1u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - // Notify that we've over limit, but not over adjusted limit - _diskMemUsageNotifier.notify(blockingUsageState); - EXPECT_FALSE(_bmj->run()); - sync(); - EXPECT_EQ(2u, docsMoved().size()); - EXPECT_EQ(0u, bucketsModified().size()); - } -}; - -struct ResourceLimitControllerFixture_1_2 : public ResourceLimitControllerFixture { - ResourceLimitControllerFixture_1_2() : ResourceLimitControllerFixture(1.2) {} -}; - -TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_disk_limit_is_reached) -{ - testJobStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); -} - -TEST_F(ResourceLimitControllerFixture, require_that_bucket_move_stops_when_memory_limit_is_reached) -{ - testJobStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); -} - -TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_disk_resource_limit) -{ - testJobNotStopping(DiskMemUsageState(ResourceUsageState(0.7, 0.8), ResourceUsageState())); -} - -TEST_F(ResourceLimitControllerFixture_1_2, require_that_bucket_move_uses_resource_limit_factor_for_memory_resource_limit) -{ - testJobNotStopping(DiskMemUsageState(ResourceUsageState(), ResourceUsageState(0.7, 0.8))); -} - - -struct MaxOutstandingMoveOpsFixture : public ControllerFixtureBase -{ - MaxOutstandingMoveOpsFixture(uint32_t maxOutstandingOps) - : ControllerFixtureBase(BlockableMaintenanceJobConfig(RESOURCE_LIMIT_FACTOR, maxOutstandingOps), true) - { - _builder.createDocs(1, 1, 2); - _builder.createDocs(2, 2, 3); - _builder.createDocs(3, 3, 4); - _builder.createDocs(4, 4, 5); - _ready.insertDocs(_builder.getDocs()); - _builder.clearDocs(); - _builder.createDocs(11, 1, 2); - _builder.createDocs(12, 2, 3); - _builder.createDocs(13, 3, 4); - _builder.createDocs(14, 4, 5); - _notReady.insertDocs(_builder.getDocs()); - addReady(_ready.bucket(3)); - _bmj->recompute(); - } - - void assertRunToBlocked() { - EXPECT_TRUE(_bmj->run()); // job becomes blocked as max outstanding limit is reached - EXPECT_FALSE(_bmj->done()); - EXPECT_TRUE(_bmj->isBlocked()); - EXPECT_TRUE(_bmj->isBlocked(BlockedReason::OUTSTANDING_OPS)); - } - void assertRunToNotBlocked() { - EXPECT_FALSE(_bmj->run()); - EXPECT_FALSE(_bmj->done()); - EXPECT_FALSE(_bmj->isBlocked()); - } - void assertRunToFinished() { - EXPECT_TRUE(_bmj->run()); - EXPECT_TRUE(_bmj->done()); - EXPECT_FALSE(_bmj->isBlocked()); - } - void assertDocsMoved(uint32_t expDocsMovedCnt, uint32_t expMoveContextsCnt) { - EXPECT_EQ(expDocsMovedCnt, docsMoved().size()); - EXPECT_EQ(expMoveContextsCnt, _moveHandler._moveDoneContexts.size()); - } - void unblockJob(uint32_t expRunnerCnt) { - _moveHandler.clearMoveDoneContexts(); // unblocks job and try to execute it via runner - EXPECT_EQ(expRunnerCnt, _runner.runCount); - EXPECT_FALSE(_bmj->isBlocked()); - } -}; - -struct MaxOutstandingMoveOpsFixture_1 : public MaxOutstandingMoveOpsFixture { - MaxOutstandingMoveOpsFixture_1() : MaxOutstandingMoveOpsFixture(1) {} -}; - -struct MaxOutstandingMoveOpsFixture_2 : public MaxOutstandingMoveOpsFixture { - MaxOutstandingMoveOpsFixture_2() : MaxOutstandingMoveOpsFixture(2) {} -}; - -TEST_F(MaxOutstandingMoveOpsFixture_1, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_1) -{ - assertRunToBlocked(); - sync(); - assertDocsMoved(1, 1); - assertRunToBlocked(); - assertDocsMoved(1, 1); - - unblockJob(1); - assertRunToBlocked(); - sync(); - assertDocsMoved(2, 1); - - unblockJob(2); - assertRunToBlocked(); - sync(); - assertDocsMoved(3, 1); - - unblockJob(3); - assertRunToFinished(); - sync(); - assertDocsMoved(3, 0); -} - -TEST_F(MaxOutstandingMoveOpsFixture_2, require_that_bucket_move_job_is_blocked_if_it_has_too_many_outstanding_move_operations_max_2) -{ - assertRunToNotBlocked(); - sync(); - assertDocsMoved(1, 1); - - assertRunToBlocked(); - sync(); - assertDocsMoved(2, 2); - - unblockJob(1); - assertRunToFinished(); - sync(); - assertDocsMoved(3, 1); -} - -GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 8040b846310..7d4d6868d21 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -5,7 +5,7 @@ vespa_add_library(searchcore_server STATIC bootstrapconfig.cpp bootstrapconfigmanager.cpp buckethandler.cpp - bucketmovejobv2.cpp + bucketmovejob.cpp clusterstatehandler.cpp combiningfeedview.cpp ddbstate.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp new file mode 100644 index 00000000000..a43fd55ba02 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -0,0 +1,462 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketmovejob.h" +#include "imaintenancejobrunner.h" +#include "ibucketstatechangednotifier.h" +#include "iclusterstatechangednotifier.h" +#include "i_disk_mem_usage_notifier.h" +#include "ibucketmodifiedhandler.h" +#include "move_operation_limiter.h" +#include "document_db_maintenance_config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +LOG_SETUP(".proton.server.bucketmovejob"); + +using document::BucketId; +using storage::spi::BucketInfo; +using storage::spi::Bucket; +using proton::bucketdb::BucketMover; +using vespalib::makeLambdaTask; +using vespalib::Trinary; + +namespace proton { + +namespace { + +const char * +toStr(bool v) { + return (v ? "T" : "F"); +} + +const char * +toStr(Trinary v) { + return (v == Trinary::True) ? "T" : ((v == Trinary::False) ? "F" : "U"); +} + +bool +blockedDueToClusterState(const std::shared_ptr &calc) +{ + bool clusterUp = calc && calc->clusterUp(); + bool nodeUp = calc && calc->nodeUp(); + bool nodeInitializing = calc && calc->nodeInitializing(); + return !(clusterUp && nodeUp && !nodeInitializing); +} + +} + +BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) + : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig), + IClusterStateChangedHandler(), + bucketdb::IBucketCreateListener(), + IBucketStateChangedHandler(), + IDiskMemUsageListener(), + std::enable_shared_from_this(), + _calc(calc), + _dbRetainer(std::move(dbRetainer)), + _moveHandler(moveHandler), + _modifiedHandler(modifiedHandler), + _master(master), + _bucketExecutor(bucketExecutor), + _ready(ready), + _notReady(notReady), + _bucketSpace(bucketSpace), + _iterateCount(0), + _movers(), + _bucketsInFlight(), + _buckets2Move(), + _bucketsPending(0), + _bucketCreateNotifier(bucketCreateNotifier), + _clusterStateChangedNotifier(clusterStateChangedNotifier), + _bucketStateChangedNotifier(bucketStateChangedNotifier), + _diskMemUsageNotifier(diskMemUsageNotifier) +{ + _movers.reserve(std::min(100u, blockableConfig.getMaxOutstandingMoveOps())); + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } + + _bucketCreateNotifier.addListener(this); + _clusterStateChangedNotifier.addClusterStateChangedHandler(this); + _bucketStateChangedNotifier.addBucketStateChangedHandler(this); + _diskMemUsageNotifier.addDiskMemUsageListener(this); + recompute(_ready.meta_store()->getBucketDB().takeGuard()); +} + +BucketMoveJobV2::~BucketMoveJobV2() +{ + _bucketCreateNotifier.removeListener(this); + _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); + _bucketStateChangedNotifier.removeBucketStateChangedHandler(this); + _diskMemUsageNotifier.removeDiskMemUsageListener(this); +} + +std::shared_ptr +BucketMoveJobV2::create(const std::shared_ptr &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) +{ + return std::shared_ptr( + new BucketMoveJobV2(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, + bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, + diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace), + [&master](auto job) { + auto failed = master.execute(makeLambdaTask([job]() { delete job; })); + assert(!failed); + }); +} + +BucketMoveJobV2::NeedResult +BucketMoveJobV2::needMove(const ScanIterator &itr) const { + NeedResult noMove(false, false); + const bool hasReadyDocs = itr.hasReadyBucketDocs(); + const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs(); + if (!hasReadyDocs && !hasNotReadyDocs) { + return noMove; // No documents for bucket in ready or notready subdbs + } + const bool isActive = itr.isActive(); + // No point in moving buckets when node is retired and everything will be deleted soon. + // However, allow moving of explicitly activated buckets, as this implies a lack of other good replicas. + if (!_calc || (_calc->nodeRetired() && !isActive)) { + return noMove; + } + const Trinary shouldBeReady = _calc->shouldBeReady(document::Bucket(_bucketSpace, itr.getBucket())); + if (shouldBeReady == Trinary::Undefined) { + return noMove; + } + const bool wantReady = (shouldBeReady == Trinary::True) || isActive; + LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)", + itr.getBucket().toString().c_str(), toStr(shouldBeReady), toStr(isActive)); + if (wantReady) { + if (!hasNotReadyDocs) { + return noMove; // No notready bucket to make ready + } + } else { + if (isActive) { + return noMove; // Do not move rom ready to not ready when active + } + if (!hasReadyDocs) { + return noMove; // No ready bucket to make notready + } + } + return {true, wantReady}; +} + +class BucketMoveJobV2::StartMove : public storage::spi::BucketTask { +public: + using IDestructorCallbackSP = std::shared_ptr; + StartMove(std::shared_ptr job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker) + : _job(job), + _keys(std::move(keys)), + _opsTracker(std::move(opsTracker)) + {} + + void run(const Bucket &bucket, IDestructorCallbackSP onDone) override { + assert(_keys.mover().getBucket() == bucket.getBucketId()); + using DoneContext = vespalib::KeepAlive>; + BucketMoveJobV2::prepareMove(std::move(_job), std::move(_keys), + std::make_shared(std::make_pair(std::move(_opsTracker), std::move(onDone)))); + } + + void fail(const Bucket &bucket) override { + BucketMoveJobV2::failOperation(std::move(_job), bucket.getBucketId()); + } + +private: + std::shared_ptr _job; + BucketMover::MoveKeys _keys; + IDestructorCallbackSP _opsTracker; +}; + +void +BucketMoveJobV2::failOperation(std::shared_ptr job, BucketId bucketId) { + auto & master = job->_master; + if (job->stopped()) return; + master.execute(makeLambdaTask([job=std::move(job), bucketId]() { + if (job->stopped()) return; + job->considerBucket(job->_ready.meta_store()->getBucketDB().takeGuard(), bucketId); + })); +} + +void +BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) { + auto [keys, done] = mover.getKeysToMove(maxDocsToMove); + if (done) { + mover.setAllScheduled(); + } + if (keys.empty()) return; + mover.updateLastValidGid(keys.back()._gid); + Bucket spiBucket(document::Bucket(_bucketSpace, mover.getBucket())); + auto bucketTask = std::make_unique(shared_from_this(), std::move(keys), getLimiter().beginOperation()); + _bucketExecutor.execute(spiBucket, std::move(bucketTask)); +} + +void +BucketMoveJobV2::prepareMove(std::shared_ptr job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone) +{ + if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use. + auto moveOps = keys.createMoveOperations(); + auto & master = job->_master; + if (job->stopped()) return; + master.execute(makeLambdaTask([job=std::move(job), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable { + if (job->stopped()) return; + job->completeMove(std::move(moveOps), std::move(onDone)); + })); +} + +void +BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { + BucketMover & mover = ops.mover(); + mover.moveDocuments(std::move(ops.success()), std::move(onDone)); + ops.failed().clear(); + if (checkIfMoverComplete(mover)) { + reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), mover.getBucket()); + } +} + +bool +BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { + bool bucketMoveComplete = mover.allScheduled() && mover.inSync(); + bool needReschedule = mover.needReschedule(); + if (bucketMoveComplete || needReschedule) { + BucketId bucket = mover.getBucket(); + auto found = _bucketsInFlight.find(bucket); + if (needReschedule) { + if ((found != _bucketsInFlight.end()) && (&mover == found->second.get())) { + //Prevent old disconnected mover from creating havoc. + _bucketsInFlight.erase(found); + _movers.erase(std::remove_if(_movers.begin(), _movers.end(), + [bucket](const BucketMoverSP &cand) { + return cand->getBucket() == bucket; + }), + _movers.end()); + return true; + } + } else { + assert(found != _bucketsInFlight.end()); + _bucketsInFlight.erase(found); + _modifiedHandler.notifyBucketModified(bucket); + } + } + updatePending(); + return false; +} + +void +BucketMoveJobV2::cancelBucket(BucketId bucket) { + auto inFlight = _bucketsInFlight.find(bucket); + if (inFlight != _bucketsInFlight.end()) { + inFlight->second->cancel(); + checkIfMoverComplete(*inFlight->second); + } +} + +void +BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) { + cancelBucket(bucket); + assert( !_bucketsInFlight.contains(bucket)); + reconsiderBucket(guard, bucket); +} + +void +BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) { + assert( ! _bucketsInFlight.contains(bucket)); + ScanIterator itr(guard, bucket); + auto [mustMove, wantReady] = needMove(itr); + if (mustMove) { + _buckets2Move[bucket] = wantReady; + } else { + _buckets2Move.erase(bucket); + } + updatePending(); + considerRun(); +} + +void +BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) +{ + considerBucket(guard, bucket); +} + +BucketMoveJobV2::BucketMoveSet +BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard) +{ + BucketMoveJobV2::BucketMoveSet toMove; + for (ScanIterator itr(guard, BucketId()); itr.valid(); ++itr) { + auto [mustMove, wantReady] = needMove(itr); + if (mustMove) { + toMove[itr.getBucket()] = wantReady; + } + } + return toMove; +} + +std::shared_ptr +BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { + const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); + const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); + LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", + bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); + return BucketMover::create(bucket, &source, target.sub_db_id(), _moveHandler); +} + +std::shared_ptr +BucketMoveJobV2::greedyCreateMover() { + if ( ! _buckets2Move.empty()) { + auto next = _buckets2Move.begin(); + auto mover = createMover(next->first, next->second); + _buckets2Move.erase(next); + return mover; + } + return {}; +} + +void +BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { + backFillMovers(); + if (_movers.empty()) return; + + // Select mover + size_t index = _iterateCount++ % _movers.size(); + auto & mover = *_movers[index]; + + //Move, or reduce movers as we are tailing off + if (!mover.allScheduled()) { + startMove(mover, maxDocsToMove); + if (mover.allScheduled()) { + _movers.erase(_movers.begin() + index); + } + } +} + +bool +BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { + for (size_t i(0); i < maxBuckets2Move; i++) { + moveDocs(maxDocsToMovePerBucket); + } + return isBlocked() || done(); +} + +bool +BucketMoveJobV2::done() const { + return _buckets2Move.empty() && _movers.empty() && !isBlocked(); +} + +bool +BucketMoveJobV2::run() +{ + if (isBlocked()) { + return true; // indicate work is done, since node state is bad + } + /// Returning false here will immediately post the job back on the executor. This will give a busy loop, + /// but this is considered fine as it is very rare and it will be intermingled with multiple feed operations. + if ( ! scanAndMove(1, 1) ) { + return false; + } + + if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { + return true; + } + return done(); +} + +void +BucketMoveJobV2::recompute() { + recompute(_ready.meta_store()->getBucketDB().takeGuard()); +} +void +BucketMoveJobV2::recompute(const bucketdb::Guard & guard) { + _buckets2Move = computeBuckets2Move(guard); + updatePending(); +} + +void +BucketMoveJobV2::backFillMovers() { + // Ensure we have enough movers. + while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { + auto mover = greedyCreateMover(); + _movers.push_back(mover); + auto bucketId = mover->getBucket(); + assert( ! _bucketsInFlight.contains(bucketId)); + _bucketsInFlight[bucketId] = std::move(mover); + } + updatePending(); +} + +void +BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr &newCalc) +{ + // Called by master write thread + _calc = newCalc; + if (blockedDueToClusterState(_calc)) { + setBlocked(BlockedReason::CLUSTER_STATE); + } else { + unBlock(BlockedReason::CLUSTER_STATE); + _movers.clear(); + std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();}); + _bucketsInFlight.clear(); + recompute(_ready.meta_store()->getBucketDB().takeGuard()); + } +} + +void +BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) +{ + // Called by master write thread + considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); +} + +void +BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + internalNotifyDiskMemUsage(state); +} + +void +BucketMoveJobV2::updatePending() { + _bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed); +} + +void +BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) const { + // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked. + metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) + + getLimiter().numPending()); +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h new file mode 100644 index 00000000000..7d5dafb33b3 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -0,0 +1,147 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "blockable_maintenance_job.h" +#include "documentbucketmover.h" +#include "i_disk_mem_usage_listener.h" +#include "ibucketstatechangedhandler.h" +#include "iclusterstatechangedhandler.h" +#include "maintenancedocumentsubdb.h" +#include +#include +#include + + +namespace storage::spi { struct BucketExecutor; } +namespace searchcorespi::index { struct IThreadService; } + +namespace proton { + +class BlockableMaintenanceJobConfig; +class IBucketStateChangedNotifier; +class IClusterStateChangedNotifier; +class IDiskMemUsageNotifier; +class IBucketModifiedHandler; + +namespace bucketdb { class IBucketCreateNotifier; } + +/** + * Class used to control the moving of buckets between the ready and + * not ready sub databases based on the readiness of buckets according to the cluster state. + * It will first compute the set of buckets to be moved. Then N of these buckets will be iterated in parallel and + * the documents scheduled for move. The movment will happen in 3 phases. + * 1 - Collect meta info for documents. Must happend in master thread + * 2 - Acquire bucket lock and fetch documents and very against meta data. This is done in BucketExecutor threads. + * 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved + * bucket modified notification is sent. + */ +class BucketMoveJobV2 : public BlockableMaintenanceJob, + public IClusterStateChangedHandler, + public bucketdb::IBucketCreateListener, + public IBucketStateChangedHandler, + public IDiskMemUsageListener, + public std::enable_shared_from_this +{ +private: + using BucketExecutor = storage::spi::BucketExecutor; + using IDestructorCallback = vespalib::IDestructorCallback; + using IDestructorCallbackSP = std::shared_ptr; + using IThreadService = searchcorespi::index::IThreadService; + using BucketId = document::BucketId; + using ScanIterator = bucketdb::ScanIterator; + using BucketMoveSet = std::map; + using NeedResult = std::pair; + using ActiveState = storage::spi::BucketInfo::ActiveState; + using BucketMover = bucketdb::BucketMover; + using BucketMoverSP = std::shared_ptr; + using Bucket2Mover = std::map; + using Movers = std::vector; + using GuardedMoveOps = BucketMover::GuardedMoveOps; + std::shared_ptr _calc; + RetainGuard _dbRetainer; + IDocumentMoveHandler &_moveHandler; + IBucketModifiedHandler &_modifiedHandler; + IThreadService &_master; + BucketExecutor &_bucketExecutor; + const MaintenanceDocumentSubDB _ready; + const MaintenanceDocumentSubDB _notReady; + const document::BucketSpace _bucketSpace; + size_t _iterateCount; + Movers _movers; + Bucket2Mover _bucketsInFlight; + BucketMoveSet _buckets2Move; + + std::atomic _bucketsPending; + + bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; + IClusterStateChangedNotifier &_clusterStateChangedNotifier; + IBucketStateChangedNotifier &_bucketStateChangedNotifier; + IDiskMemUsageNotifier &_diskMemUsageNotifier; + + BucketMoveJobV2(const std::shared_ptr &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace); + + void startMove(BucketMover & mover, size_t maxDocsToMove); + static void prepareMove(std::shared_ptr job, BucketMover::MoveKeys keys, IDestructorCallbackSP context); + void completeMove(GuardedMoveOps moveOps, IDestructorCallbackSP context); + bool checkIfMoverComplete(const BucketMover & mover); + void considerBucket(const bucketdb::Guard & guard, BucketId bucket); + void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket); + void updatePending(); + void cancelBucket(BucketId bucket); // True if something to cancel + NeedResult needMove(const ScanIterator &itr) const; + BucketMoveSet computeBuckets2Move(const bucketdb::Guard & guard); + BucketMoverSP createMover(BucketId bucket, bool wantReady); + BucketMoverSP greedyCreateMover(); + void backFillMovers(); + void moveDocs(size_t maxDocsToMove); + static void failOperation(std::shared_ptr job, BucketId bucket); + void recompute(const bucketdb::Guard & guard); + class StartMove; +public: + static std::shared_ptr + create(const std::shared_ptr &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace); + + ~BucketMoveJobV2() override; + + bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket); + bool done() const; + void recompute(); // Only for testing + + bool run() override; + void notifyClusterStateChanged(const std::shared_ptr &newCalc) override; + void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override; + void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override; + void updateMetrics(DocumentDBTaggedMetrics & metrics) const override; +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp deleted file mode 100644 index f2c6dc3a8be..00000000000 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ /dev/null @@ -1,462 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "bucketmovejobv2.h" -#include "imaintenancejobrunner.h" -#include "ibucketstatechangednotifier.h" -#include "iclusterstatechangednotifier.h" -#include "i_disk_mem_usage_notifier.h" -#include "ibucketmodifiedhandler.h" -#include "move_operation_limiter.h" -#include "document_db_maintenance_config.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -LOG_SETUP(".proton.server.bucketmovejob"); - -using document::BucketId; -using storage::spi::BucketInfo; -using storage::spi::Bucket; -using proton::bucketdb::BucketMover; -using vespalib::makeLambdaTask; -using vespalib::Trinary; - -namespace proton { - -namespace { - -const char * -toStr(bool v) { - return (v ? "T" : "F"); -} - -const char * -toStr(Trinary v) { - return (v == Trinary::True) ? "T" : ((v == Trinary::False) ? "F" : "U"); -} - -bool -blockedDueToClusterState(const std::shared_ptr &calc) -{ - bool clusterUp = calc && calc->clusterUp(); - bool nodeUp = calc && calc->nodeUp(); - bool nodeInitializing = calc && calc->nodeInitializing(); - return !(clusterUp && nodeUp && !nodeInitializing); -} - -} - -BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace) - : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig), - IClusterStateChangedHandler(), - bucketdb::IBucketCreateListener(), - IBucketStateChangedHandler(), - IDiskMemUsageListener(), - std::enable_shared_from_this(), - _calc(calc), - _dbRetainer(std::move(dbRetainer)), - _moveHandler(moveHandler), - _modifiedHandler(modifiedHandler), - _master(master), - _bucketExecutor(bucketExecutor), - _ready(ready), - _notReady(notReady), - _bucketSpace(bucketSpace), - _iterateCount(0), - _movers(), - _bucketsInFlight(), - _buckets2Move(), - _bucketsPending(0), - _bucketCreateNotifier(bucketCreateNotifier), - _clusterStateChangedNotifier(clusterStateChangedNotifier), - _bucketStateChangedNotifier(bucketStateChangedNotifier), - _diskMemUsageNotifier(diskMemUsageNotifier) -{ - _movers.reserve(std::min(100u, blockableConfig.getMaxOutstandingMoveOps())); - if (blockedDueToClusterState(_calc)) { - setBlocked(BlockedReason::CLUSTER_STATE); - } - - _bucketCreateNotifier.addListener(this); - _clusterStateChangedNotifier.addClusterStateChangedHandler(this); - _bucketStateChangedNotifier.addBucketStateChangedHandler(this); - _diskMemUsageNotifier.addDiskMemUsageListener(this); - recompute(_ready.meta_store()->getBucketDB().takeGuard()); -} - -BucketMoveJobV2::~BucketMoveJobV2() -{ - _bucketCreateNotifier.removeListener(this); - _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); - _bucketStateChangedNotifier.removeBucketStateChangedHandler(this); - _diskMemUsageNotifier.removeDiskMemUsageListener(this); -} - -std::shared_ptr -BucketMoveJobV2::create(const std::shared_ptr &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace) -{ - return std::shared_ptr( - new BucketMoveJobV2(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, - bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, - diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace), - [&master](auto job) { - auto failed = master.execute(makeLambdaTask([job]() { delete job; })); - assert(!failed); - }); -} - -BucketMoveJobV2::NeedResult -BucketMoveJobV2::needMove(const ScanIterator &itr) const { - NeedResult noMove(false, false); - const bool hasReadyDocs = itr.hasReadyBucketDocs(); - const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs(); - if (!hasReadyDocs && !hasNotReadyDocs) { - return noMove; // No documents for bucket in ready or notready subdbs - } - const bool isActive = itr.isActive(); - // No point in moving buckets when node is retired and everything will be deleted soon. - // However, allow moving of explicitly activated buckets, as this implies a lack of other good replicas. - if (!_calc || (_calc->nodeRetired() && !isActive)) { - return noMove; - } - const Trinary shouldBeReady = _calc->shouldBeReady(document::Bucket(_bucketSpace, itr.getBucket())); - if (shouldBeReady == Trinary::Undefined) { - return noMove; - } - const bool wantReady = (shouldBeReady == Trinary::True) || isActive; - LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)", - itr.getBucket().toString().c_str(), toStr(shouldBeReady), toStr(isActive)); - if (wantReady) { - if (!hasNotReadyDocs) { - return noMove; // No notready bucket to make ready - } - } else { - if (isActive) { - return noMove; // Do not move rom ready to not ready when active - } - if (!hasReadyDocs) { - return noMove; // No ready bucket to make notready - } - } - return {true, wantReady}; -} - -class BucketMoveJobV2::StartMove : public storage::spi::BucketTask { -public: - using IDestructorCallbackSP = std::shared_ptr; - StartMove(std::shared_ptr job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker) - : _job(job), - _keys(std::move(keys)), - _opsTracker(std::move(opsTracker)) - {} - - void run(const Bucket &bucket, IDestructorCallbackSP onDone) override { - assert(_keys.mover().getBucket() == bucket.getBucketId()); - using DoneContext = vespalib::KeepAlive>; - BucketMoveJobV2::prepareMove(std::move(_job), std::move(_keys), - std::make_shared(std::make_pair(std::move(_opsTracker), std::move(onDone)))); - } - - void fail(const Bucket &bucket) override { - BucketMoveJobV2::failOperation(std::move(_job), bucket.getBucketId()); - } - -private: - std::shared_ptr _job; - BucketMover::MoveKeys _keys; - IDestructorCallbackSP _opsTracker; -}; - -void -BucketMoveJobV2::failOperation(std::shared_ptr job, BucketId bucketId) { - auto & master = job->_master; - if (job->stopped()) return; - master.execute(makeLambdaTask([job=std::move(job), bucketId]() { - if (job->stopped()) return; - job->considerBucket(job->_ready.meta_store()->getBucketDB().takeGuard(), bucketId); - })); -} - -void -BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) { - auto [keys, done] = mover.getKeysToMove(maxDocsToMove); - if (done) { - mover.setAllScheduled(); - } - if (keys.empty()) return; - mover.updateLastValidGid(keys.back()._gid); - Bucket spiBucket(document::Bucket(_bucketSpace, mover.getBucket())); - auto bucketTask = std::make_unique(shared_from_this(), std::move(keys), getLimiter().beginOperation()); - _bucketExecutor.execute(spiBucket, std::move(bucketTask)); -} - -void -BucketMoveJobV2::prepareMove(std::shared_ptr job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone) -{ - if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use. - auto moveOps = keys.createMoveOperations(); - auto & master = job->_master; - if (job->stopped()) return; - master.execute(makeLambdaTask([job=std::move(job), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable { - if (job->stopped()) return; - job->completeMove(std::move(moveOps), std::move(onDone)); - })); -} - -void -BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { - BucketMover & mover = ops.mover(); - mover.moveDocuments(std::move(ops.success()), std::move(onDone)); - ops.failed().clear(); - if (checkIfMoverComplete(mover)) { - reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), mover.getBucket()); - } -} - -bool -BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { - bool bucketMoveComplete = mover.allScheduled() && mover.inSync(); - bool needReschedule = mover.needReschedule(); - if (bucketMoveComplete || needReschedule) { - BucketId bucket = mover.getBucket(); - auto found = _bucketsInFlight.find(bucket); - if (needReschedule) { - if ((found != _bucketsInFlight.end()) && (&mover == found->second.get())) { - //Prevent old disconnected mover from creating havoc. - _bucketsInFlight.erase(found); - _movers.erase(std::remove_if(_movers.begin(), _movers.end(), - [bucket](const BucketMoverSP &cand) { - return cand->getBucket() == bucket; - }), - _movers.end()); - return true; - } - } else { - assert(found != _bucketsInFlight.end()); - _bucketsInFlight.erase(found); - _modifiedHandler.notifyBucketModified(bucket); - } - } - updatePending(); - return false; -} - -void -BucketMoveJobV2::cancelBucket(BucketId bucket) { - auto inFlight = _bucketsInFlight.find(bucket); - if (inFlight != _bucketsInFlight.end()) { - inFlight->second->cancel(); - checkIfMoverComplete(*inFlight->second); - } -} - -void -BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) { - cancelBucket(bucket); - assert( !_bucketsInFlight.contains(bucket)); - reconsiderBucket(guard, bucket); -} - -void -BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) { - assert( ! _bucketsInFlight.contains(bucket)); - ScanIterator itr(guard, bucket); - auto [mustMove, wantReady] = needMove(itr); - if (mustMove) { - _buckets2Move[bucket] = wantReady; - } else { - _buckets2Move.erase(bucket); - } - updatePending(); - considerRun(); -} - -void -BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) -{ - considerBucket(guard, bucket); -} - -BucketMoveJobV2::BucketMoveSet -BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard) -{ - BucketMoveJobV2::BucketMoveSet toMove; - for (ScanIterator itr(guard, BucketId()); itr.valid(); ++itr) { - auto [mustMove, wantReady] = needMove(itr); - if (mustMove) { - toMove[itr.getBucket()] = wantReady; - } - } - return toMove; -} - -std::shared_ptr -BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { - const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); - const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); - LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", - bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); - return BucketMover::create(bucket, &source, target.sub_db_id(), _moveHandler); -} - -std::shared_ptr -BucketMoveJobV2::greedyCreateMover() { - if ( ! _buckets2Move.empty()) { - auto next = _buckets2Move.begin(); - auto mover = createMover(next->first, next->second); - _buckets2Move.erase(next); - return mover; - } - return {}; -} - -void -BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { - backFillMovers(); - if (_movers.empty()) return; - - // Select mover - size_t index = _iterateCount++ % _movers.size(); - auto & mover = *_movers[index]; - - //Move, or reduce movers as we are tailing off - if (!mover.allScheduled()) { - startMove(mover, maxDocsToMove); - if (mover.allScheduled()) { - _movers.erase(_movers.begin() + index); - } - } -} - -bool -BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { - for (size_t i(0); i < maxBuckets2Move; i++) { - moveDocs(maxDocsToMovePerBucket); - } - return isBlocked() || done(); -} - -bool -BucketMoveJobV2::done() const { - return _buckets2Move.empty() && _movers.empty() && !isBlocked(); -} - -bool -BucketMoveJobV2::run() -{ - if (isBlocked()) { - return true; // indicate work is done, since node state is bad - } - /// Returning false here will immediately post the job back on the executor. This will give a busy loop, - /// but this is considered fine as it is very rare and it will be intermingled with multiple feed operations. - if ( ! scanAndMove(1, 1) ) { - return false; - } - - if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { - return true; - } - return done(); -} - -void -BucketMoveJobV2::recompute() { - recompute(_ready.meta_store()->getBucketDB().takeGuard()); -} -void -BucketMoveJobV2::recompute(const bucketdb::Guard & guard) { - _buckets2Move = computeBuckets2Move(guard); - updatePending(); -} - -void -BucketMoveJobV2::backFillMovers() { - // Ensure we have enough movers. - while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { - auto mover = greedyCreateMover(); - _movers.push_back(mover); - auto bucketId = mover->getBucket(); - assert( ! _bucketsInFlight.contains(bucketId)); - _bucketsInFlight[bucketId] = std::move(mover); - } - updatePending(); -} - -void -BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr &newCalc) -{ - // Called by master write thread - _calc = newCalc; - if (blockedDueToClusterState(_calc)) { - setBlocked(BlockedReason::CLUSTER_STATE); - } else { - unBlock(BlockedReason::CLUSTER_STATE); - _movers.clear(); - std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();}); - _bucketsInFlight.clear(); - recompute(_ready.meta_store()->getBucketDB().takeGuard()); - } -} - -void -BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) -{ - // Called by master write thread - considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); -} - -void -BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) -{ - // Called by master write thread - internalNotifyDiskMemUsage(state); -} - -void -BucketMoveJobV2::updatePending() { - _bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed); -} - -void -BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) const { - // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked. - metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) + - getLimiter().numPending()); -} - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h deleted file mode 100644 index 7d5dafb33b3..00000000000 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "blockable_maintenance_job.h" -#include "documentbucketmover.h" -#include "i_disk_mem_usage_listener.h" -#include "ibucketstatechangedhandler.h" -#include "iclusterstatechangedhandler.h" -#include "maintenancedocumentsubdb.h" -#include -#include -#include - - -namespace storage::spi { struct BucketExecutor; } -namespace searchcorespi::index { struct IThreadService; } - -namespace proton { - -class BlockableMaintenanceJobConfig; -class IBucketStateChangedNotifier; -class IClusterStateChangedNotifier; -class IDiskMemUsageNotifier; -class IBucketModifiedHandler; - -namespace bucketdb { class IBucketCreateNotifier; } - -/** - * Class used to control the moving of buckets between the ready and - * not ready sub databases based on the readiness of buckets according to the cluster state. - * It will first compute the set of buckets to be moved. Then N of these buckets will be iterated in parallel and - * the documents scheduled for move. The movment will happen in 3 phases. - * 1 - Collect meta info for documents. Must happend in master thread - * 2 - Acquire bucket lock and fetch documents and very against meta data. This is done in BucketExecutor threads. - * 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved - * bucket modified notification is sent. - */ -class BucketMoveJobV2 : public BlockableMaintenanceJob, - public IClusterStateChangedHandler, - public bucketdb::IBucketCreateListener, - public IBucketStateChangedHandler, - public IDiskMemUsageListener, - public std::enable_shared_from_this -{ -private: - using BucketExecutor = storage::spi::BucketExecutor; - using IDestructorCallback = vespalib::IDestructorCallback; - using IDestructorCallbackSP = std::shared_ptr; - using IThreadService = searchcorespi::index::IThreadService; - using BucketId = document::BucketId; - using ScanIterator = bucketdb::ScanIterator; - using BucketMoveSet = std::map; - using NeedResult = std::pair; - using ActiveState = storage::spi::BucketInfo::ActiveState; - using BucketMover = bucketdb::BucketMover; - using BucketMoverSP = std::shared_ptr; - using Bucket2Mover = std::map; - using Movers = std::vector; - using GuardedMoveOps = BucketMover::GuardedMoveOps; - std::shared_ptr _calc; - RetainGuard _dbRetainer; - IDocumentMoveHandler &_moveHandler; - IBucketModifiedHandler &_modifiedHandler; - IThreadService &_master; - BucketExecutor &_bucketExecutor; - const MaintenanceDocumentSubDB _ready; - const MaintenanceDocumentSubDB _notReady; - const document::BucketSpace _bucketSpace; - size_t _iterateCount; - Movers _movers; - Bucket2Mover _bucketsInFlight; - BucketMoveSet _buckets2Move; - - std::atomic _bucketsPending; - - bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; - IClusterStateChangedNotifier &_clusterStateChangedNotifier; - IBucketStateChangedNotifier &_bucketStateChangedNotifier; - IDiskMemUsageNotifier &_diskMemUsageNotifier; - - BucketMoveJobV2(const std::shared_ptr &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace); - - void startMove(BucketMover & mover, size_t maxDocsToMove); - static void prepareMove(std::shared_ptr job, BucketMover::MoveKeys keys, IDestructorCallbackSP context); - void completeMove(GuardedMoveOps moveOps, IDestructorCallbackSP context); - bool checkIfMoverComplete(const BucketMover & mover); - void considerBucket(const bucketdb::Guard & guard, BucketId bucket); - void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket); - void updatePending(); - void cancelBucket(BucketId bucket); // True if something to cancel - NeedResult needMove(const ScanIterator &itr) const; - BucketMoveSet computeBuckets2Move(const bucketdb::Guard & guard); - BucketMoverSP createMover(BucketId bucket, bool wantReady); - BucketMoverSP greedyCreateMover(); - void backFillMovers(); - void moveDocs(size_t maxDocsToMove); - static void failOperation(std::shared_ptr job, BucketId bucket); - void recompute(const bucketdb::Guard & guard); - class StartMove; -public: - static std::shared_ptr - create(const std::shared_ptr &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace); - - ~BucketMoveJobV2() override; - - bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket); - bool done() const; - void recompute(); // Only for testing - - bool run() override; - void notifyClusterStateChanged(const std::shared_ptr &newCalc) override; - void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override; - void notifyDiskMemUsage(DiskMemUsageState state) override; - void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override; - void updateMetrics(DocumentDBTaggedMetrics & metrics) const override; -}; - -} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index 026646d6c5a..e713335be4b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "bucketmovejobv2.h" +#include "bucketmovejob.h" #include "heart_beat_job.h" #include "job_tracked_maintenance_job.h" #include "lid_space_compaction_job.h" -- cgit v1.2.3