diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-11-04 07:23:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-04 07:23:40 +0100 |
commit | d656e179fa22eff22188e46e3b6c26da84a4d0ed (patch) | |
tree | 27b36eaa84226f399e8f4f6f26a01a987f1ce7fb | |
parent | f991182476272cf1f80787300fd18b47b08ba08b (diff) | |
parent | 5247f841c003d9275ebe919fe8367f016ef24f38 (diff) |
Merge pull request #15144 from vespa-engine/vekterli/increase-priority-for-global-bucket-merges
Increase priority for global bucket merges
6 files changed, 112 insertions, 11 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 268a58a140e..d5cf734ed0a 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -200,6 +200,12 @@ struct DistributorTest : Test, DistributorTestUtil { configureDistributor(builder); } + void configure_prioritize_global_bucket_merges(bool enabled) { + ConfigBuilder builder; + builder.prioritizeGlobalBucketMerges = enabled; + configureDistributor(builder); + } + void configureMaxClusterClockSkew(int seconds); void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); @@ -521,6 +527,7 @@ TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configurati builder.prioritySplitLargeBucket = 9; builder.prioritySplitInconsistentBucket = 10; builder.priorityGarbageCollection = 11; + builder.priorityMergeGlobalBuckets = 12; getConfig().configure(builder); @@ -536,6 +543,7 @@ TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configurati EXPECT_EQ(9, static_cast<int>(mp.splitLargeBucket)); EXPECT_EQ(10, static_cast<int>(mp.splitInconsistentBucket)); EXPECT_EQ(11, static_cast<int>(mp.garbageCollection)); + EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets)); } TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { @@ -1169,4 +1177,15 @@ TEST_F(DistributorTest, closing_aborts_gets_started_outside_main_distributor_thr EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult()); } +TEST_F(DistributorTest, prioritize_global_bucket_merges_config_is_propagated_to_internal_config) { + createLinks(); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_prioritize_global_bucket_merges(true); + EXPECT_TRUE(getConfig().prioritize_global_bucket_merges()); + + configure_prioritize_global_bucket_merges(false); + EXPECT_FALSE(getConfig().prioritize_global_bucket_merges()); +} + } diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index cbe1c93a4ba..f66aab26dc9 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -2,6 +2,7 @@ #include "distributortestutil.h" #include <vespa/config-stor-distribution.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/bucketdbupdater.h> @@ -79,8 +80,8 @@ struct StateCheckersTest : Test, DistributorTestUtil { .getSibling(c.getBucketId()); std::vector<BucketDatabase::Entry> entries; - getBucketDatabase().getAll(c.getBucketId(), entries); - c.siblingEntry = getBucketDatabase().get(c.siblingBucket); + getBucketDatabase(c.getBucketSpace()).getAll(c.getBucketId(), entries); + c.siblingEntry = getBucketDatabase(c.getBucketSpace()).get(c.siblingBucket); c.entries = entries; for (uint32_t j = 0; j < entries.size(); ++j) { @@ -126,7 +127,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { ost << "NO OPERATIONS GENERATED"; } - getBucketDatabase().clear(); + getBucketDatabase(c.getBucketSpace()).clear(); return ost.str(); } @@ -160,6 +161,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { std::string _clusterState {"distributor:1 storage:2"}; std::string _pending_cluster_state; std::string _expect; + document::BucketSpace _bucket_space {document::FixedBucketSpaces::default_space()}; static const PendingMessage NO_OP_BLOCKER; const PendingMessage* _blockerMessage {&NO_OP_BLOCKER}; uint32_t _redundancy {2}; @@ -169,6 +171,7 @@ struct StateCheckersTest : Test, DistributorTestUtil { bool _includeMessagePriority {false}; bool _includeSchedulingPriority {false}; bool _merge_operations_disabled {false}; + bool _prioritize_global_bucket_merges {true}; CheckerParams(); ~CheckerParams(); @@ -208,6 +211,14 @@ struct StateCheckersTest : Test, DistributorTestUtil { _merge_operations_disabled = disabled; return *this; } + CheckerParams& prioritize_global_bucket_merges(bool enabled) noexcept { + _prioritize_global_bucket_merges = enabled; + return *this; + } + CheckerParams& bucket_space(document::BucketSpace bucket_space) noexcept { + _bucket_space = bucket_space; + return *this; + } }; template <typename CheckerImpl> @@ -215,18 +226,22 @@ struct StateCheckersTest : Test, DistributorTestUtil { CheckerImpl checker; document::BucketId bid(17, 0); - addNodesToBucketDB(bid, params._bucketInfo); + document::Bucket bucket(params._bucket_space, bid); + addNodesToBucketDB(bucket, params._bucketInfo); setRedundancy(params._redundancy); enableDistributorClusterState(params._clusterState); getConfig().set_merge_operations_disabled(params._merge_operations_disabled); + getConfig().set_prioritize_global_bucket_merges(params._prioritize_global_bucket_merges); if (!params._pending_cluster_state.empty()) { auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(params._pending_cluster_state)); _distributor->onDown(cmd); tick(); // Trigger command processing and pending state setup. } NodeMaintenanceStatsTracker statsTracker; - StateChecker::Context c( - getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid)); + StateChecker::Context c(getExternalOperationHandler(), + getBucketSpaceRepo().get(params._bucket_space), + statsTracker, + bucket); std::string result = testStateChecker( checker, c, false, *params._blockerMessage, params._includeMessagePriority, @@ -683,6 +698,36 @@ TEST_F(StateCheckersTest, synchronize_and_move) { .clusterState("distributor:1 storage:4")); } +TEST_F(StateCheckersTest, global_bucket_merges_have_high_priority_if_prioritization_enabled) { + runAndVerify<SynchronizeAndMoveStateChecker>( + CheckerParams().expect( + "[Synchronizing buckets with different checksums " + "node(idx=0,crc=0x1,docs=1/1,bytes=1/1,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x2,docs=2/2,bytes=2/2,trusted=false,active=false,ready=false)] " + "(pri 115) " + "(scheduling pri HIGH)") + .bucketInfo("0=1,1=2") + .bucket_space(document::FixedBucketSpaces::global_space()) + .includeSchedulingPriority(true) + .includeMessagePriority(true) + .prioritize_global_bucket_merges(true)); +} + +TEST_F(StateCheckersTest, global_bucket_merges_have_normal_priority_if_prioritization_disabled) { + runAndVerify<SynchronizeAndMoveStateChecker>( + CheckerParams().expect( + "[Synchronizing buckets with different checksums " + "node(idx=0,crc=0x1,docs=1/1,bytes=1/1,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x2,docs=2/2,bytes=2/2,trusted=false,active=false,ready=false)] " + "(pri 120) " + "(scheduling pri MEDIUM)") + .bucketInfo("0=1,1=2") + .bucket_space(document::FixedBucketSpaces::global_space()) + .includeSchedulingPriority(true) + .includeMessagePriority(true) + .prioritize_global_bucket_merges(false)); +} + // Upon entering a cluster state transition edge the distributor will // prune all replicas from its DB that are on nodes that are unavailable // in the _pending_ state. As long as this state is pending, the _current_ diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index aa606cdc8b9..b3830c7e042 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -44,6 +44,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _merge_operations_disabled(false), _use_weak_internal_read_consistency_for_client_gets(false), _enable_metadata_only_fetch_phase_for_inconsistent_updates(false), + _prioritize_global_bucket_merges(true), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -94,6 +95,7 @@ DistributorConfiguration::configureMaintenancePriorities( mp.mergeMoveToIdealNode = cfg.priorityMergeMoveToIdealNode; mp.mergeOutOfSyncCopies = cfg.priorityMergeOutOfSyncCopies; mp.mergeTooFewCopies = cfg.priorityMergeTooFewCopies; + mp.mergeGlobalBuckets = cfg.priorityMergeGlobalBuckets; mp.activateNoExistingActive = cfg.priorityActivateNoExistingActive; mp.activateWithExistingActive = cfg.priorityActivateWithExistingActive; mp.deleteBucketCopy = cfg.priorityDeleteBucketCopy; @@ -160,6 +162,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _merge_operations_disabled = config.mergeOperationsDisabled; _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets; _enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates; + _prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 0c4b1f5756c..41a30165f49 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -23,6 +23,7 @@ public: uint8_t mergeMoveToIdealNode {120}; uint8_t mergeOutOfSyncCopies {120}; uint8_t mergeTooFewCopies {120}; + uint8_t mergeGlobalBuckets {115}; uint8_t activateNoExistingActive {100}; uint8_t activateWithExistingActive {100}; uint8_t deleteBucketCopy {100}; @@ -246,6 +247,13 @@ public: return _max_consecutively_inhibited_maintenance_ticks; } + void set_prioritize_global_bucket_merges(bool prioritize) noexcept { + _prioritize_global_bucket_merges = prioritize; + } + bool prioritize_global_bucket_merges() const noexcept { + return _prioritize_global_bucket_merges; + } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -294,6 +302,7 @@ private: bool _merge_operations_disabled; bool _use_weak_internal_read_consistency_for_client_gets; bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; + bool _prioritize_global_bucket_merges; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index db2bfb61376..54f6006895e 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -89,6 +89,12 @@ priority_merge_out_of_sync_copies int default=120 ## Merge for restoring redundancy of copies priority_merge_too_few_copies int default=120 +## Merge that involves a global bucket. There are generally significantly fewer such +## buckets than default-space buckets, and searches to documents in the default space +## may depend on the presence of (all) global documents. Consequently, this priority +## should be higher (i.e. numerically smaller) than that of regular merges. +priority_merge_global_buckets int default=115 + ## Copy activation when there are no other active copies (likely causing ## lack of search coverage for that bucket) priority_activate_no_existing_active int default=100 @@ -96,7 +102,7 @@ priority_activate_no_existing_active int default=100 ## Copy activation when there is already an active copy for the bucket. priority_activate_with_existing_active int default=100 -## Deletion of bucket copy. Cheap on VDS, not necessarily so on indexed search. +## Deletion of bucket copy. priority_delete_bucket_copy int default=100 ## Joining caused by bucket siblings getting sufficiently small to fit into a @@ -244,3 +250,12 @@ enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false ## This is to reduce the amount of CPU spent on ideal state calculations and bucket DB ## accesses when the distributor is heavily loaded with feed operations. max_consecutively_inhibited_maintenance_ticks int default=20 + +## If set, pending merges to buckets in the global bucket space will be prioritized +## higher than merges to buckets in the default bucket space. This ensures that global +## documents will be kept in sync without being starved by non-global documents. +## Note that enabling this feature risks starving default bucket space merges if a +## resource exhaustion case prevents global merges from completing. +## This is a live config for that reason, i.e. it can be disabled in an emergency +## situation if needed. +prioritize_global_bucket_merges bool default=true diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp index 2da4dd529c5..e861adda428 100644 --- a/storage/src/vespa/storage/distributor/statecheckers.cpp +++ b/storage/src/vespa/storage/distributor/statecheckers.cpp @@ -2,6 +2,7 @@ #include "statecheckers.h" #include "activecopy.h" +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/storage/distributor/operations/idealstate/splitoperation.h> #include <vespa/storage/distributor/operations/idealstate/joinoperation.h> #include <vespa/storage/distributor/operations/idealstate/removebucketoperation.h> @@ -861,11 +862,20 @@ SynchronizeAndMoveStateChecker::check(StateChecker::Context& c) IdealStateOperation::UP op( new MergeOperation(BucketAndNodes(c.getBucket(), result.nodes()), c.distributorConfig.getMaxNodesPerMerge())); - op->setPriority(result.priority()); op->setDetailedReason(result.reason()); - MaintenancePriority::Priority schedPri( - result.needsMoveOnly() ? MaintenancePriority::LOW - : MaintenancePriority::MEDIUM); + MaintenancePriority::Priority schedPri; + if ((c.getBucketSpace() == document::FixedBucketSpaces::default_space()) + || !c.distributorConfig.prioritize_global_bucket_merges()) + { + schedPri = (result.needsMoveOnly() ? MaintenancePriority::LOW + : MaintenancePriority::MEDIUM); + op->setPriority(result.priority()); + } else { + // Since the default bucket space has a dependency on the global bucket space, + // we prioritize scheduling of merges to global buckets over those for default buckets. + schedPri = MaintenancePriority::HIGH; + op->setPriority(c.distributorConfig.getMaintenancePriorities().mergeGlobalBuckets); + } return Result::createStoredResult(std::move(op), schedPri); } else { |