diff options
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/distributor/distributor.cpp | 25 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/distributor.h | 6 |
2 files changed, 26 insertions, 5 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 65945b2c6ae..7ac15b3929c 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -68,6 +68,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _stripe_pool(stripe_pool), _stripes(), _stripe_accessor(), + _random_stripe_gen(), + _random_stripe_gen_mutex(), _message_queue(), _fetched_messages(), _component(*this, compReg, "distributor"), @@ -364,16 +366,29 @@ get_bucket_id_for_striping(const api::StorageMessage& msg, const DistributorNode return msg.getBucketId(); } +} + uint32_t -stripe_of_bucket_id(const document::BucketId& bucketd_id, uint8_t n_stripe_bits) +Distributor::random_stripe_idx() +{ + std::lock_guard lock(_random_stripe_gen_mutex); + return _random_stripe_gen.nextUint32() % _stripes.size(); +} + +uint32_t +Distributor::stripe_of_bucket_id(const document::BucketId& bucketd_id, api::MessageType::Id msg_id) { if (!bucketd_id.isSet()) { // TODO STRIPE: Messages with a non-set bucket id should be handled by the top-level distributor instead. return 0; + } else if (bucketd_id.getUsedBits() < spi::BucketLimits::MinUsedBits) { + if (msg_id == api::MessageType::VISITOR_CREATE_ID) { + // This message will eventually be bounced with api::ReturnCode::WRONG_DISTRIBUTION, + // so we can just route it to a random distributor stripe. + return random_stripe_idx(); + } } - return storage::stripe_of_bucket_key(bucketd_id.toKey(), n_stripe_bits); -} - + return storage::stripe_of_bucket_key(bucketd_id.toKey(), _n_stripe_bits); } bool @@ -389,7 +404,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) return true; } auto bucket_id = get_bucket_id_for_striping(*msg, _component); - uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, _n_stripe_bits); + uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, msg->getType().getId()); MBUS_TRACE(msg->getTrace(), 9, vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx)); bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg); diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 61a1f06309d..6f9654b78b8 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -24,6 +24,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/metric/metricupdatehook.h> #include <vespa/storageframework/generic/thread/tickingthread.h> +#include <vespa/vdslib/state/random.h> #include <chrono> #include <queue> #include <unordered_map> @@ -189,6 +190,9 @@ private: // Precondition: _stripe_scan_notify_mutex is held [[nodiscard]] bool may_send_host_info_on_behalf_of_stripes(std::lock_guard<std::mutex>& held_lock) noexcept; + uint32_t random_stripe_idx(); + uint32_t stripe_of_bucket_id(const document::BucketId& bucketd_id, api::MessageType::Id msg_id); + struct StripeScanStats { bool wants_to_send_host_info = false; bool has_reported_in_at_least_once = false; @@ -206,6 +210,8 @@ private: DistributorStripePool& _stripe_pool; std::vector<std::unique_ptr<DistributorStripe>> _stripes; std::unique_ptr<StripeAccessor> _stripe_accessor; + storage::lib::RandomGen _random_stripe_gen; + std::mutex _random_stripe_gen_mutex; MessageQueue _message_queue; // Queue for top-level ops MessageQueue _fetched_messages; distributor::DistributorComponent _component; |