summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-06-09 12:01:10 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-06-09 12:01:10 +0000
commit47c98e6d137d158d1a4fb0cae00db4298f97ec7b (patch)
treedcb5764973a2077b6291664ebeffdeafa9ece1fb /storage
parent7a7296a89fecd6238bf845741157b6c6971602ff (diff)
Route CreateVisitorCommand with too few used bits in the super bucket id to a random distributor stripe.
Such commands will eventually be bounced with WRONG_DISTRIBUTION when handled by the stripe.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp25
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h6
2 files changed, 26 insertions, 5 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 65945b2c6ae..7ac15b3929c 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -68,6 +68,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_stripe_pool(stripe_pool),
_stripes(),
_stripe_accessor(),
+ _random_stripe_gen(),
+ _random_stripe_gen_mutex(),
_message_queue(),
_fetched_messages(),
_component(*this, compReg, "distributor"),
@@ -364,16 +366,29 @@ get_bucket_id_for_striping(const api::StorageMessage& msg, const DistributorNode
return msg.getBucketId();
}
+}
+
uint32_t
-stripe_of_bucket_id(const document::BucketId& bucketd_id, uint8_t n_stripe_bits)
+Distributor::random_stripe_idx()
+{
+ std::lock_guard lock(_random_stripe_gen_mutex);
+ return _random_stripe_gen.nextUint32() % _stripes.size();
+}
+
+uint32_t
+Distributor::stripe_of_bucket_id(const document::BucketId& bucketd_id, api::MessageType::Id msg_id)
{
if (!bucketd_id.isSet()) {
// TODO STRIPE: Messages with a non-set bucket id should be handled by the top-level distributor instead.
return 0;
+ } else if (bucketd_id.getUsedBits() < spi::BucketLimits::MinUsedBits) {
+ if (msg_id == api::MessageType::VISITOR_CREATE_ID) {
+ // This message will eventually be bounced with api::ReturnCode::WRONG_DISTRIBUTION,
+ // so we can just route it to a random distributor stripe.
+ return random_stripe_idx();
+ }
}
- return storage::stripe_of_bucket_key(bucketd_id.toKey(), n_stripe_bits);
-}
-
+ return storage::stripe_of_bucket_key(bucketd_id.toKey(), _n_stripe_bits);
}
bool
@@ -389,7 +404,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
return true;
}
auto bucket_id = get_bucket_id_for_striping(*msg, _component);
- uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, _n_stripe_bits);
+ uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, msg->getType().getId());
MBUS_TRACE(msg->getTrace(), 9,
vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx));
bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg);
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 61a1f06309d..6f9654b78b8 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -24,6 +24,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/tickingthread.h>
+#include <vespa/vdslib/state/random.h>
#include <chrono>
#include <queue>
#include <unordered_map>
@@ -189,6 +190,9 @@ private:
// Precondition: _stripe_scan_notify_mutex is held
[[nodiscard]] bool may_send_host_info_on_behalf_of_stripes(std::lock_guard<std::mutex>& held_lock) noexcept;
+ uint32_t random_stripe_idx();
+ uint32_t stripe_of_bucket_id(const document::BucketId& bucketd_id, api::MessageType::Id msg_id);
+
struct StripeScanStats {
bool wants_to_send_host_info = false;
bool has_reported_in_at_least_once = false;
@@ -206,6 +210,8 @@ private:
DistributorStripePool& _stripe_pool;
std::vector<std::unique_ptr<DistributorStripe>> _stripes;
std::unique_ptr<StripeAccessor> _stripe_accessor;
+ storage::lib::RandomGen _random_stripe_gen;
+ std::mutex _random_stripe_gen_mutex;
MessageQueue _message_queue; // Queue for top-level ops
MessageQueue _fetched_messages;
distributor::DistributorComponent _component;