aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-11-04 07:23:40 +0100
committerGitHub <noreply@github.com>2020-11-04 07:23:40 +0100
commitd656e179fa22eff22188e46e3b6c26da84a4d0ed (patch)
tree27b36eaa84226f399e8f4f6f26a01a987f1ce7fb
parentf991182476272cf1f80787300fd18b47b08ba08b (diff)
parent5247f841c003d9275ebe919fe8367f016ef24f38 (diff)
Merge pull request #15144 from vespa-engine/vekterli/increase-priority-for-global-bucket-merges
Increase priority for global bucket merges
-rw-r--r--storage/src/tests/distributor/distributortest.cpp19
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp57
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp3
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h9
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def17
-rw-r--r--storage/src/vespa/storage/distributor/statecheckers.cpp18
6 files changed, 112 insertions, 11 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 268a58a140e..d5cf734ed0a 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -200,6 +200,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_prioritize_global_bucket_merges(bool enabled) {
+ ConfigBuilder builder;
+ builder.prioritizeGlobalBucketMerges = enabled;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -521,6 +527,7 @@ TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configurati
builder.prioritySplitLargeBucket = 9;
builder.prioritySplitInconsistentBucket = 10;
builder.priorityGarbageCollection = 11;
+ builder.priorityMergeGlobalBuckets = 12;
getConfig().configure(builder);
@@ -536,6 +543,7 @@ TEST_F(DistributorTest, priority_config_is_propagated_to_distributor_configurati
EXPECT_EQ(9, static_cast<int>(mp.splitLargeBucket));
EXPECT_EQ(10, static_cast<int>(mp.splitInconsistentBucket));
EXPECT_EQ(11, static_cast<int>(mp.garbageCollection));
+ EXPECT_EQ(12, static_cast<int>(mp.mergeGlobalBuckets));
}
TEST_F(DistributorTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) {
@@ -1169,4 +1177,15 @@ TEST_F(DistributorTest, closing_aborts_gets_started_outside_main_distributor_thr
EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult());
}
+TEST_F(DistributorTest, prioritize_global_bucket_merges_config_is_propagated_to_internal_config) {
+ createLinks();
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_prioritize_global_bucket_merges(true);
+ EXPECT_TRUE(getConfig().prioritize_global_bucket_merges());
+
+ configure_prioritize_global_bucket_merges(false);
+ EXPECT_FALSE(getConfig().prioritize_global_bucket_merges());
+}
+
}
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index cbe1c93a4ba..f66aab26dc9 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -2,6 +2,7 @@
#include "distributortestutil.h"
#include <vespa/config-stor-distribution.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
@@ -79,8 +80,8 @@ struct StateCheckersTest : Test, DistributorTestUtil {
.getSibling(c.getBucketId());
std::vector<BucketDatabase::Entry> entries;
- getBucketDatabase().getAll(c.getBucketId(), entries);
- c.siblingEntry = getBucketDatabase().get(c.siblingBucket);
+ getBucketDatabase(c.getBucketSpace()).getAll(c.getBucketId(), entries);
+ c.siblingEntry = getBucketDatabase(c.getBucketSpace()).get(c.siblingBucket);
c.entries = entries;
for (uint32_t j = 0; j < entries.size(); ++j) {
@@ -126,7 +127,7 @@ struct StateCheckersTest : Test, DistributorTestUtil {
ost << "NO OPERATIONS GENERATED";
}
- getBucketDatabase().clear();
+ getBucketDatabase(c.getBucketSpace()).clear();
return ost.str();
}
@@ -160,6 +161,7 @@ struct StateCheckersTest : Test, DistributorTestUtil {
std::string _clusterState {"distributor:1 storage:2"};
std::string _pending_cluster_state;
std::string _expect;
+ document::BucketSpace _bucket_space {document::FixedBucketSpaces::default_space()};
static const PendingMessage NO_OP_BLOCKER;
const PendingMessage* _blockerMessage {&NO_OP_BLOCKER};
uint32_t _redundancy {2};
@@ -169,6 +171,7 @@ struct StateCheckersTest : Test, DistributorTestUtil {
bool _includeMessagePriority {false};
bool _includeSchedulingPriority {false};
bool _merge_operations_disabled {false};
+ bool _prioritize_global_bucket_merges {true};
CheckerParams();
~CheckerParams();
@@ -208,6 +211,14 @@ struct StateCheckersTest : Test, DistributorTestUtil {
_merge_operations_disabled = disabled;
return *this;
}
+ CheckerParams& prioritize_global_bucket_merges(bool enabled) noexcept {
+ _prioritize_global_bucket_merges = enabled;
+ return *this;
+ }
+ CheckerParams& bucket_space(document::BucketSpace bucket_space) noexcept {
+ _bucket_space = bucket_space;
+ return *this;
+ }
};
template <typename CheckerImpl>
@@ -215,18 +226,22 @@ struct StateCheckersTest : Test, DistributorTestUtil {
CheckerImpl checker;
document::BucketId bid(17, 0);
- addNodesToBucketDB(bid, params._bucketInfo);
+ document::Bucket bucket(params._bucket_space, bid);
+ addNodesToBucketDB(bucket, params._bucketInfo);
setRedundancy(params._redundancy);
enableDistributorClusterState(params._clusterState);
getConfig().set_merge_operations_disabled(params._merge_operations_disabled);
+ getConfig().set_prioritize_global_bucket_merges(params._prioritize_global_bucket_merges);
if (!params._pending_cluster_state.empty()) {
auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(params._pending_cluster_state));
_distributor->onDown(cmd);
tick(); // Trigger command processing and pending state setup.
}
NodeMaintenanceStatsTracker statsTracker;
- StateChecker::Context c(
- getExternalOperationHandler(), getDistributorBucketSpace(), statsTracker, makeDocumentBucket(bid));
+ StateChecker::Context c(getExternalOperationHandler(),
+ getBucketSpaceRepo().get(params._bucket_space),
+ statsTracker,
+ bucket);
std::string result = testStateChecker(
checker, c, false, *params._blockerMessage,
params._includeMessagePriority,
@@ -683,6 +698,36 @@ TEST_F(StateCheckersTest, synchronize_and_move) {
.clusterState("distributor:1 storage:4"));
}
+TEST_F(StateCheckersTest, global_bucket_merges_have_high_priority_if_prioritization_enabled) {
+ runAndVerify<SynchronizeAndMoveStateChecker>(
+ CheckerParams().expect(
+ "[Synchronizing buckets with different checksums "
+ "node(idx=0,crc=0x1,docs=1/1,bytes=1/1,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x2,docs=2/2,bytes=2/2,trusted=false,active=false,ready=false)] "
+ "(pri 115) "
+ "(scheduling pri HIGH)")
+ .bucketInfo("0=1,1=2")
+ .bucket_space(document::FixedBucketSpaces::global_space())
+ .includeSchedulingPriority(true)
+ .includeMessagePriority(true)
+ .prioritize_global_bucket_merges(true));
+}
+
+TEST_F(StateCheckersTest, global_bucket_merges_have_normal_priority_if_prioritization_disabled) {
+ runAndVerify<SynchronizeAndMoveStateChecker>(
+ CheckerParams().expect(
+ "[Synchronizing buckets with different checksums "
+ "node(idx=0,crc=0x1,docs=1/1,bytes=1/1,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x2,docs=2/2,bytes=2/2,trusted=false,active=false,ready=false)] "
+ "(pri 120) "
+ "(scheduling pri MEDIUM)")
+ .bucketInfo("0=1,1=2")
+ .bucket_space(document::FixedBucketSpaces::global_space())
+ .includeSchedulingPriority(true)
+ .includeMessagePriority(true)
+ .prioritize_global_bucket_merges(false));
+}
+
// Upon entering a cluster state transition edge the distributor will
// prune all replicas from its DB that are on nodes that are unavailable
// in the _pending_ state. As long as this state is pending, the _current_
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index aa606cdc8b9..b3830c7e042 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -44,6 +44,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_merge_operations_disabled(false),
_use_weak_internal_read_consistency_for_client_gets(false),
_enable_metadata_only_fetch_phase_for_inconsistent_updates(false),
+ _prioritize_global_bucket_merges(true),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -94,6 +95,7 @@ DistributorConfiguration::configureMaintenancePriorities(
mp.mergeMoveToIdealNode = cfg.priorityMergeMoveToIdealNode;
mp.mergeOutOfSyncCopies = cfg.priorityMergeOutOfSyncCopies;
mp.mergeTooFewCopies = cfg.priorityMergeTooFewCopies;
+ mp.mergeGlobalBuckets = cfg.priorityMergeGlobalBuckets;
mp.activateNoExistingActive = cfg.priorityActivateNoExistingActive;
mp.activateWithExistingActive = cfg.priorityActivateWithExistingActive;
mp.deleteBucketCopy = cfg.priorityDeleteBucketCopy;
@@ -160,6 +162,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_merge_operations_disabled = config.mergeOperationsDisabled;
_use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets;
_enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates;
+ _prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 0c4b1f5756c..41a30165f49 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -23,6 +23,7 @@ public:
uint8_t mergeMoveToIdealNode {120};
uint8_t mergeOutOfSyncCopies {120};
uint8_t mergeTooFewCopies {120};
+ uint8_t mergeGlobalBuckets {115};
uint8_t activateNoExistingActive {100};
uint8_t activateWithExistingActive {100};
uint8_t deleteBucketCopy {100};
@@ -246,6 +247,13 @@ public:
return _max_consecutively_inhibited_maintenance_ticks;
}
+ void set_prioritize_global_bucket_merges(bool prioritize) noexcept {
+ _prioritize_global_bucket_merges = prioritize;
+ }
+ bool prioritize_global_bucket_merges() const noexcept {
+ return _prioritize_global_bucket_merges;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -294,6 +302,7 @@ private:
bool _merge_operations_disabled;
bool _use_weak_internal_read_consistency_for_client_gets;
bool _enable_metadata_only_fetch_phase_for_inconsistent_updates;
+ bool _prioritize_global_bucket_merges;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index db2bfb61376..54f6006895e 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -89,6 +89,12 @@ priority_merge_out_of_sync_copies int default=120
## Merge for restoring redundancy of copies
priority_merge_too_few_copies int default=120
+## Merge that involves a global bucket. There are generally significantly fewer such
+## buckets than default-space buckets, and searches to documents in the default space
+## may depend on the presence of (all) global documents. Consequently, this priority
+## should be higher (i.e. numerically smaller) than that of regular merges.
+priority_merge_global_buckets int default=115
+
## Copy activation when there are no other active copies (likely causing
## lack of search coverage for that bucket)
priority_activate_no_existing_active int default=100
@@ -96,7 +102,7 @@ priority_activate_no_existing_active int default=100
## Copy activation when there is already an active copy for the bucket.
priority_activate_with_existing_active int default=100
-## Deletion of bucket copy. Cheap on VDS, not necessarily so on indexed search.
+## Deletion of bucket copy.
priority_delete_bucket_copy int default=100
## Joining caused by bucket siblings getting sufficiently small to fit into a
@@ -244,3 +250,12 @@ enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false
## This is to reduce the amount of CPU spent on ideal state calculations and bucket DB
## accesses when the distributor is heavily loaded with feed operations.
max_consecutively_inhibited_maintenance_ticks int default=20
+
+## If set, pending merges to buckets in the global bucket space will be prioritized
+## higher than merges to buckets in the default bucket space. This ensures that global
+## documents will be kept in sync without being starved by non-global documents.
+## Note that enabling this feature risks starving default bucket space merges if a
+## resource exhaustion case prevents global merges from completing.
+## This is a live config for that reason, i.e. it can be disabled in an emergency
+## situation if needed.
+prioritize_global_bucket_merges bool default=true
diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp
index 2da4dd529c5..e861adda428 100644
--- a/storage/src/vespa/storage/distributor/statecheckers.cpp
+++ b/storage/src/vespa/storage/distributor/statecheckers.cpp
@@ -2,6 +2,7 @@
#include "statecheckers.h"
#include "activecopy.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/distributor/operations/idealstate/splitoperation.h>
#include <vespa/storage/distributor/operations/idealstate/joinoperation.h>
#include <vespa/storage/distributor/operations/idealstate/removebucketoperation.h>
@@ -861,11 +862,20 @@ SynchronizeAndMoveStateChecker::check(StateChecker::Context& c)
IdealStateOperation::UP op(
new MergeOperation(BucketAndNodes(c.getBucket(), result.nodes()),
c.distributorConfig.getMaxNodesPerMerge()));
- op->setPriority(result.priority());
op->setDetailedReason(result.reason());
- MaintenancePriority::Priority schedPri(
- result.needsMoveOnly() ? MaintenancePriority::LOW
- : MaintenancePriority::MEDIUM);
+ MaintenancePriority::Priority schedPri;
+ if ((c.getBucketSpace() == document::FixedBucketSpaces::default_space())
+ || !c.distributorConfig.prioritize_global_bucket_merges())
+ {
+ schedPri = (result.needsMoveOnly() ? MaintenancePriority::LOW
+ : MaintenancePriority::MEDIUM);
+ op->setPriority(result.priority());
+ } else {
+ // Since the default bucket space has a dependency on the global bucket space,
+ // we prioritize scheduling of merges to global buckets over those for default buckets.
+ schedPri = MaintenancePriority::HIGH;
+ op->setPriority(c.distributorConfig.getMaintenancePriorities().mergeGlobalBuckets);
+ }
return Result::createStoredResult(std::move(op), schedPri);
} else {