diff options
Diffstat (limited to 'storage/src/vespa')
9 files changed, 107 insertions, 17 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h index 9616a1fae6e..1fd6c824904 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h @@ -106,6 +106,11 @@ public: return _nodes; } + // If there is a valid majority of replicas that have the same metadata + // (checksum and document count), return that bucket info. + // Otherwise, return default-constructed info with valid() == false. + api::BucketInfo majority_consistent_bucket_info() const noexcept; + std::string toString() const; uint32_t getHighestDocumentCount() const; diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp index 562e8d562fb..07316ff0d71 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp @@ -2,6 +2,7 @@ #include "bucketcopy.h" #include <vespa/vespalib/util/arrayref.h> +#include <vespa/vespalib/stllike/hash_map.hpp> #include <iostream> #include <sstream> @@ -65,6 +66,55 @@ bool BucketInfoBase<NodeSeq>::consistentNodes(bool countInvalidAsConsistent) con return true; } +namespace { + +// BucketInfo wrapper which only concerns itself with fields that indicate +// whether replicas are in sync. +struct ReplicaMetadata { + api::BucketInfo info; + + ReplicaMetadata() noexcept = default; + explicit ReplicaMetadata(const api::BucketInfo& info_) noexcept + : info(info_) + {} + bool operator==(const ReplicaMetadata& rhs) const noexcept { + // TODO merge state checker itself only considers checksum, should we do the same...? + return ((info.getChecksum() == rhs.info.getChecksum()) && + (info.getDocumentCount() == rhs.info.getDocumentCount())); + } + struct hash { + size_t operator()(const ReplicaMetadata& rm) const noexcept { + // We assume that just using the checksum is extremely likely to be unique in the table. + return rm.info.getChecksum(); + } + }; +}; + +constexpr bool is_majority(size_t n, size_t m) { + return (n >= (m / 2) + 1); +} + +} + +template <typename NodeSeq> +api::BucketInfo BucketInfoBase<NodeSeq>::majority_consistent_bucket_info() const noexcept { + if (_nodes.size() < 3) { + return {}; + } + vespalib::hash_map<ReplicaMetadata, uint16_t, ReplicaMetadata::hash> meta_tracker; + for (const auto& n : _nodes) { + if (n.valid()) { + meta_tracker[ReplicaMetadata(n.getBucketInfo())]++; + } + } + for (const auto& meta : meta_tracker) { + if (is_majority(meta.second, _nodes.size())) { + return meta.first.info; + } + } + return {}; +} + template <typename NodeSeq> void BucketInfoBase<NodeSeq>::print(std::ostream& out, bool verbose, const std::string& indent) const { if (_nodes.size() == 0) { diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index a87977bec11..ea963b227e2 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -24,6 +24,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _idealStateChunkSize(1000), _maxNodesPerMerge(16), _max_consecutively_inhibited_maintenance_ticks(20), + _max_activation_inhibited_out_of_sync_groups(0), _lastGarbageCollectionChange(vespalib::duration::zero()), _garbageCollectionInterval(0), _minPendingMaintenanceOps(100), @@ -165,6 +166,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets; _enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates; _prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges; + _max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups; _enable_revert = config.enableRevert; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 5ff1a8b3503..9c1456fa9fd 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -253,6 +253,14 @@ public: bool prioritize_global_bucket_merges() const noexcept { return _prioritize_global_bucket_merges; } + + void set_max_activation_inhibited_out_of_sync_groups(uint32_t max_groups) noexcept { + _max_activation_inhibited_out_of_sync_groups = max_groups; + } + uint32_t max_activation_inhibited_out_of_sync_groups() const noexcept { + return _max_activation_inhibited_out_of_sync_groups; + } + bool enable_revert() const noexcept { return _enable_revert; } @@ -274,6 +282,7 @@ private: uint32_t _idealStateChunkSize; uint32_t _maxNodesPerMerge; uint32_t _max_consecutively_inhibited_maintenance_ticks; + uint32_t _max_activation_inhibited_out_of_sync_groups; std::string _garbageCollectionSelection; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 54f6006895e..6efaf8f8558 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -259,3 +259,17 @@ max_consecutively_inhibited_maintenance_ticks int default=20 ## This is a live config for that reason, i.e. it can be disabled in an emergency ## situation if needed. prioritize_global_bucket_merges bool default=true + +## If set, activation of bucket replicas is limited to only those replicas that have +## bucket info consistent with a majority of the other replicas for that bucket. +## Multiple active replicas is only a feature that is enabled for grouped clusters, +## and this feature is intended to prevent nodes in stale groups (whose buckets are +## likely to be out of sync) from serving query traffic until they have gotten back +## into sync. +## Up to to the given number of groups can have their replica activation inhibited +## by this feature. If zero, the feature is functionally disabled. +## If more groups are out of sync than the configured number N, the inhibited groups +## will be the N first groups present in the distribution config. +## Note: this feature only kicks in if the number of groups in the cluster is greater +## than 1. +max_activation_inhibited_out_of_sync_groups int default=0 diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp index 5654e986882..e5bfe489d26 100644 --- a/storage/src/vespa/storage/distributor/activecopy.cpp +++ b/storage/src/vespa/storage/distributor/activecopy.cpp @@ -112,24 +112,17 @@ namespace { } } -#undef DEBUG -#if 0 -#define DEBUG(a) a -#else -#define DEBUG(a) -#endif - ActiveList ActiveCopy::calculate(const std::vector<uint16_t>& idealState, const lib::Distribution& distribution, - BucketDatabase::Entry& e) + BucketDatabase::Entry& e, + uint32_t max_activation_inhibited_out_of_sync_groups) { - DEBUG(std::cerr << "Ideal state is " << idealState << "\n"); std::vector<uint16_t> validNodesWithCopy = buildValidNodeIndexList(e); if (validNodesWithCopy.empty()) { return ActiveList(); } - typedef std::vector<uint16_t> IndexList; + using IndexList = std::vector<uint16_t>; std::vector<IndexList> groups; if (distribution.activePerGroup()) { groups = distribution.splitNodesIntoLeafGroups(std::move(validNodesWithCopy)); @@ -138,11 +131,24 @@ ActiveCopy::calculate(const std::vector<uint16_t>& idealState, } std::vector<ActiveCopy> result; result.reserve(groups.size()); - for (uint32_t i=0; i<groups.size(); ++i) { - std::vector<ActiveCopy> entries = buildNodeList(e, groups[i], idealState); - DEBUG(std::cerr << "Finding active for group " << entries << "\n"); + + auto maybe_majority_info = ((max_activation_inhibited_out_of_sync_groups > 0) + ? e->majority_consistent_bucket_info() + : api::BucketInfo()); // Invalid by default + uint32_t inhibited_groups = 0; + for (const auto& group_nodes : groups) { + std::vector<ActiveCopy> entries = buildNodeList(e, group_nodes, idealState); auto best = std::min_element(entries.begin(), entries.end(), ActiveStateOrder()); - DEBUG(std::cerr << "Best copy " << *best << "\n"); + if ((groups.size() > 1) && + (inhibited_groups < max_activation_inhibited_out_of_sync_groups) && + maybe_majority_info.valid()) + { + const auto* candidate = e->getNode(best->_nodeIndex); + if (!candidate->getBucketInfo().equalDocumentInfo(maybe_majority_info) && !candidate->active()) { + ++inhibited_groups; + continue; // Do _not_ add candidate as activation target since it's out of sync with the majority + } + } result.emplace_back(*best); } return ActiveList(std::move(result)); diff --git a/storage/src/vespa/storage/distributor/activecopy.h b/storage/src/vespa/storage/distributor/activecopy.h index d9f83be3748..d22e7072cc7 100644 --- a/storage/src/vespa/storage/distributor/activecopy.h +++ b/storage/src/vespa/storage/distributor/activecopy.h @@ -17,7 +17,9 @@ struct ActiveCopy { friend std::ostream& operator<<(std::ostream& out, const ActiveCopy& e); static ActiveList calculate(const std::vector<uint16_t>& idealState, - const lib::Distribution&, BucketDatabase::Entry&); + const lib::Distribution&, + BucketDatabase::Entry&, + uint32_t max_activation_inhibited_out_of_sync_groups); uint16_t _nodeIndex; uint16_t _ideal; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 7a247f6c524..380c4fdf332 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -109,7 +109,8 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi BucketDatabase::Entry entry(_bucketSpace.getBucketDatabase().get(lastBucket)); std::vector<uint16_t> idealState( _bucketSpace.get_ideal_service_layer_nodes_bundle(lastBucket).get_available_nodes()); - active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry); + active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry, + _op_ctx.distributor_config().max_activation_inhibited_out_of_sync_groups()); LOG(debug, "Active copies for bucket %s: %s", entry.getBucketId().toString().c_str(), active.toString().c_str()); for (uint32_t i=0; i<active.size(); ++i) { BucketCopy copy(*entry->getNode(active[i]._nodeIndex)); diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp index 796a0434ed4..bdd850e1e23 100644 --- a/storage/src/vespa/storage/distributor/statecheckers.cpp +++ b/storage/src/vespa/storage/distributor/statecheckers.cpp @@ -1060,7 +1060,8 @@ BucketStateStateChecker::check(StateChecker::Context& c) } ActiveList activeNodes( - ActiveCopy::calculate(c.idealState, c.distribution, c.entry)); + ActiveCopy::calculate(c.idealState, c.distribution, c.entry, + c.distributorConfig.max_activation_inhibited_out_of_sync_groups())); if (activeNodes.empty()) { return Result::noMaintenanceNeeded(); } |