From 5247f841c003d9275ebe919fe8367f016ef24f38 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 3 Nov 2020 14:12:36 +0000 Subject: Make prioritization of global bucket merges live configurable Lets an application that is starving default space merges explicitly disable prioritization temporarily. This is a stop-gap before we add more robust and generalized handling of resource exhaustion scenarios. --- storage/src/tests/distributor/distributortest.cpp | 17 ++++++++++++++ .../src/tests/distributor/statecheckerstest.cpp | 26 ++++++++++++++++++++-- .../storage/config/distributorconfiguration.cpp | 2 ++ .../storage/config/distributorconfiguration.h | 8 +++++++ .../storage/config/stor-distributormanager.def | 9 ++++++++ .../vespa/storage/distributor/statecheckers.cpp | 4 +++- 6 files changed, 63 insertions(+), 3 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 235314dd38b..d5cf734ed0a 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -200,6 +200,12 @@ struct DistributorTest : Test, DistributorTestUtil { configureDistributor(builder); } + void configure_prioritize_global_bucket_merges(bool enabled) { + ConfigBuilder builder; + builder.prioritizeGlobalBucketMerges = enabled; + configureDistributor(builder); + } + void configureMaxClusterClockSkew(int seconds); void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); @@ -1171,4 +1177,15 @@ TEST_F(DistributorTest, closing_aborts_gets_started_outside_main_distributor_thr EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult()); } +TEST_F(DistributorTest, prioritize_global_bucket_merges_config_is_propagated_to_internal_config) { + createLinks(); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_prioritize_global_bucket_merges(true); + EXPECT_TRUE(getConfig().prioritize_global_bucket_merges()); + + configure_prioritize_global_bucket_merges(false); + EXPECT_FALSE(getConfig().prioritize_global_bucket_merges()); +} + } diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index ad95e14fe7c..0bb01af2ae1 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -171,6 +171,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { bool _includeMessagePriority {false}; bool _includeSchedulingPriority {false}; bool _merge_operations_disabled {false}; + bool _prioritize_global_bucket_merges {true}; CheckerParams(); ~CheckerParams(); @@ -210,6 +211,10 @@ struct StateCheckersTest : Test, DistributorTestUtil { _merge_operations_disabled = disabled; return *this; } + CheckerParams& prioritize_global_bucket_merges(bool enabled) noexcept { + _prioritize_global_bucket_merges = enabled; + return *this; + } CheckerParams& bucket_space(document::BucketSpace bucket_space) noexcept { _bucket_space = bucket_space; return *this; @@ -226,6 +231,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { setRedundancy(params._redundancy); enableDistributorClusterState(params._clusterState); getConfig().set_merge_operations_disabled(params._merge_operations_disabled); + getConfig().set_prioritize_global_bucket_merges(params._prioritize_global_bucket_merges); if (!params._pending_cluster_state.empty()) { auto cmd = std::make_shared(lib::ClusterState(params._pending_cluster_state)); _distributor->onDown(cmd); @@ -766,7 +772,7 @@ TEST_F(StateCheckersTest, synchronize_and_move) { .clusterState("distributor:1 storage:4")); } -TEST_F(StateCheckersTest, global_bucket_merges_have_high_priority) { +TEST_F(StateCheckersTest, global_bucket_merges_have_high_priority_if_prioritization_enabled) { runAndVerify( CheckerParams().expect( "[Synchronizing buckets with different checksums " @@ -777,7 +783,23 @@ TEST_F(StateCheckersTest, global_bucket_merges_have_high_priority) { .bucketInfo("0=1,1=2") .bucket_space(document::FixedBucketSpaces::global_space()) .includeSchedulingPriority(true) - .includeMessagePriority(true)); + .includeMessagePriority(true) + .prioritize_global_bucket_merges(true)); +} + +TEST_F(StateCheckersTest, global_bucket_merges_have_normal_priority_if_prioritization_disabled) { + runAndVerify( + CheckerParams().expect( + "[Synchronizing buckets with different checksums " + "node(idx=0,crc=0x1,docs=1/1,bytes=1/1,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x2,docs=2/2,bytes=2/2,trusted=false,active=false,ready=false)] " + "(pri 120) " + "(scheduling pri MEDIUM)") + .bucketInfo("0=1,1=2") + .bucket_space(document::FixedBucketSpaces::global_space()) + .includeSchedulingPriority(true) + .includeMessagePriority(true) + .prioritize_global_bucket_merges(false)); } // Upon entering a cluster state transition edge the distributor will diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 596d4aad298..b3830c7e042 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -44,6 +44,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _merge_operations_disabled(false), _use_weak_internal_read_consistency_for_client_gets(false), _enable_metadata_only_fetch_phase_for_inconsistent_updates(false), + _prioritize_global_bucket_merges(true), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -161,6 +162,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _merge_operations_disabled = config.mergeOperationsDisabled; _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets; _enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates; + _prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index f8f3c3f6e07..41a30165f49 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -247,6 +247,13 @@ public: return _max_consecutively_inhibited_maintenance_ticks; } + void set_prioritize_global_bucket_merges(bool prioritize) noexcept { + _prioritize_global_bucket_merges = prioritize; + } + bool prioritize_global_bucket_merges() const noexcept { + return _prioritize_global_bucket_merges; + } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -295,6 +302,7 @@ private: bool _merge_operations_disabled; bool _use_weak_internal_read_consistency_for_client_gets; bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; + bool _prioritize_global_bucket_merges; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 92bdaf5dda8..54f6006895e 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -250,3 +250,12 @@ enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false ## This is to reduce the amount of CPU spent on ideal state calculations and bucket DB ## accesses when the distributor is heavily loaded with feed operations. max_consecutively_inhibited_maintenance_ticks int default=20 + +## If set, pending merges to buckets in the global bucket space will be prioritized +## higher than merges to buckets in the default bucket space. This ensures that global +## documents will be kept in sync without being starved by non-global documents. +## Note that enabling this feature risks starving default bucket space merges if a +## resource exhaustion case prevents global merges from completing. +## This is a live config for that reason, i.e. it can be disabled in an emergency +## situation if needed. +prioritize_global_bucket_merges bool default=true diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp index b4913aee5c3..e861adda428 100644 --- a/storage/src/vespa/storage/distributor/statecheckers.cpp +++ b/storage/src/vespa/storage/distributor/statecheckers.cpp @@ -864,7 +864,9 @@ SynchronizeAndMoveStateChecker::check(StateChecker::Context& c) c.distributorConfig.getMaxNodesPerMerge())); op->setDetailedReason(result.reason()); MaintenancePriority::Priority schedPri; - if (c.getBucketSpace() == document::FixedBucketSpaces::default_space()) { + if ((c.getBucketSpace() == document::FixedBucketSpaces::default_space()) + || !c.distributorConfig.prioritize_global_bucket_merges()) + { schedPri = (result.needsMoveOnly() ? MaintenancePriority::LOW : MaintenancePriority::MEDIUM); op->setPriority(result.priority()); -- cgit v1.2.3