diff options
Diffstat (limited to 'storage')
6 files changed, 83 insertions, 23 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index d85a4ff41d1..368a25315fb 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -23,6 +23,7 @@ #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/config/distributorconfiguration.h> #include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h> +#include <vespa/storageapi/message/persistence.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/memoryusage.h> @@ -60,6 +61,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _metrics(std::make_shared<DistributorMetricSet>()), _messageSender(messageSender), _use_legacy_mode(num_distributor_stripes == 0), + _n_stripe_bits(0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler, *this, *this, _use_legacy_mode)), _stripe_pool(stripe_pool), @@ -90,13 +92,19 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); if (!_use_legacy_mode) { assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes)); - LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone + _n_stripe_bits = calc_num_stripe_bits(num_distributor_stripes); + LOG(info, "Setting up distributor with %u stripes using %u stripe bits", + num_distributor_stripes, _n_stripe_bits); // TODO STRIPE remove once legacy gone _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool); _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, _component.getDistribution(), *_stripe_accessor); _stripes.emplace_back(std::move(_stripe)); + for (size_t i = 1; i < num_distributor_stripes; ++i) { + _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, + doneInitHandler, *this, *this, _use_legacy_mode, i)); + } _stripe_scan_stats.resize(num_distributor_stripes); _distributorStatusDelegate.registerStatusPage(); _bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater); @@ -116,14 +124,12 @@ Distributor::~Distributor() // TODO STRIPE remove DistributorStripe& Distributor::first_stripe() noexcept { - assert(_stripes.size() == 1); return *_stripes[0]; } // TODO STRIPE remove const DistributorStripe& Distributor::first_stripe() const noexcept { - assert(_stripes.size() == 1); return *_stripes[0]; } @@ -254,7 +260,10 @@ Distributor::onOpen() _threadPool.addThread(*this); _threadPool.start(_component.getThreadPool()); if (!_use_legacy_mode) { - std::vector<TickableStripe*> pool_stripes({_stripes[0].get()}); + std::vector<TickableStripe*> pool_stripes; + for (auto& stripe : _stripes) { + pool_stripes.push_back(stripe.get()); + } _stripe_pool.start(pool_stripes); } } else { @@ -321,6 +330,43 @@ bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& } } +document::BucketId +get_bucket_id_for_striping(const api::StorageMessage& msg, const DistributorNodeContext& node_ctx) +{ + if (!msg.getBucketId().isSet()) { + // Calculate a bucket id (dependent on the message type) to dispatch the message to the correct distributor stripe. + switch (msg.getType().getId()) { + case api::MessageType::PUT_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::REMOVE_ID: + return node_ctx.bucket_id_factory().getBucketId(dynamic_cast<const api::TestAndSetCommand&>(msg).getDocumentId()); + case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: + { + const auto& reply = dynamic_cast<const api::RequestBucketInfoReply&>(msg); + if (!reply.getBucketInfo().empty()) { + // Note: All bucket ids in this reply belong to the same distributor stripe, so we just use the first entry. + return reply.getBucketInfo()[0]._bucketId; + } else { + return reply.getBucketId(); + } + } + default: + return msg.getBucketId(); + } + } + return msg.getBucketId(); +} + +uint32_t +stripe_of_bucket_id(const document::BucketId& bucketd_id, uint8_t n_stripe_bits) +{ + if (!bucketd_id.isSet()) { + // TODO STRIPE: Messages with a non-set bucket id should be handled by the top-level distributor instead. + return 0; + } + return storage::stripe_of_bucket_key(bucketd_id.toKey(), n_stripe_bits); +} + } bool @@ -335,12 +381,13 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) dispatch_to_main_distributor_thread_queue(msg); return true; } - assert(_stripes.size() == 1); - assert(_stripe_pool.stripe_count() == 1); - // TODO STRIPE correct routing with multiple stripes - bool handled = first_stripe().handle_or_enqueue_message(msg); + auto bucket_id = get_bucket_id_for_striping(*msg, _component); + uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, _n_stripe_bits); + MBUS_TRACE(msg->getTrace(), 9, + vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx)); + bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg); if (handled) { - _stripe_pool.stripe_thread(0).notify_event_has_triggered(); + _stripe_pool.stripe_thread(stripe_idx).notify_event_has_triggered(); } return handled; } @@ -442,7 +489,6 @@ Distributor::propagateDefaultDistribution( } else { // Should only be called at ctor time, at which point the pool is not yet running. assert(_stripe_pool.stripe_count() == 0); - assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); for (auto& stripe : _stripes) { stripe->update_distribution_config(new_configs); diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 50bd2526ff4..61a1f06309d 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -201,6 +201,7 @@ private: ChainedMessageSender* _messageSender; const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. + uint8_t _n_stripe_bits; std::unique_ptr<DistributorStripe> _stripe; DistributorStripePool& _stripe_pool; std::vector<std::unique_ptr<DistributorStripe>> _stripes; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 5c6c529fe69..bf78707cfd9 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -41,7 +41,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, DoneInitializeHandler& doneInitHandler, ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, - bool use_legacy_mode) + bool use_legacy_mode, + uint32_t stripe_index) : DistributorStripeInterface(), framework::StatusReporter("distributor", "Distributor"), _clusterStateBundle(lib::ClusterState()), @@ -57,7 +58,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _bucketDBUpdater(_component, _component, *this, *this, use_legacy_mode), _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), - _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg), + _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, stripe_index), _messageSender(messageSender), _stripe_host_info_notifier(stripe_host_info_notifier), _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(), @@ -86,7 +87,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _last_db_memory_sample_time_point(), _inhibited_maintenance_tick_count(0), _must_send_updated_host_info(false), - _use_legacy_mode(use_legacy_mode) + _use_legacy_mode(use_legacy_mode), + _stripe_index(stripe_index) { if (use_legacy_mode) { _distributorStatusDelegate.registerStatusPage(); @@ -176,7 +178,7 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM return true; } MBUS_TRACE(msg->getTrace(), 9, - "Distributor: Added to message queue. Thread state: " + vespalib::make_string("DistributorStripe[%u]: Added to message queue. Thread state: ", _stripe_index) + _threadPool.getStatus()); if (_use_legacy_mode) { // TODO STRIPE remove @@ -570,7 +572,8 @@ bool is_client_request(const api::StorageMessage& msg) noexcept { void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) { if (!handleMessage(msg)) { - MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down."); + MBUS_TRACE(msg->getTrace(), 9, + vespalib::make_string("DistributorStripe[%u]: Not handling it. Sending further down", _stripe_index)); _messageSender.sendDown(msg); } } @@ -578,10 +581,12 @@ void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::S void DistributorStripe::startExternalOperations() { for (auto& msg : _fetchedMessages) { if (is_client_request(*msg)) { - MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue"); + MBUS_TRACE(msg->getTrace(), 9, + vespalib::make_string("DistributorStripe[%u]: Adding to client request priority queue", _stripe_index)); _client_request_priority_queue.emplace(std::move(msg)); } else { - MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed."); + MBUS_TRACE(msg->getTrace(), 9, + vespalib::make_string("DistributorStripe[%u]: Grabbed from queue to be processed", _stripe_index)); handle_or_propagate_message(msg); } } @@ -589,8 +594,9 @@ void DistributorStripe::startExternalOperations() { const bool start_single_client_request = !_client_request_priority_queue.empty(); if (start_single_client_request) { const auto& msg = _client_request_priority_queue.top(); - MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from " - "client request priority queue to be processed."); + MBUS_TRACE(msg->getTrace(), 9, + vespalib::make_string("DistributorStripe[%u]: Grabbed from " + "client request priority queue to be processed", _stripe_index)); handle_or_propagate_message(msg); _client_request_priority_queue.pop(); } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index b82b5483bd3..347863b6d77 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -64,7 +64,8 @@ public: DoneInitializeHandler&, ChainedMessageSender& messageSender, StripeHostInfoNotifier& stripe_host_info_notifier, - bool use_legacy_mode); + bool use_legacy_mode, + uint32_t stripe_index = 0); ~DistributorStripe() override; @@ -363,6 +364,7 @@ private: size_t _inhibited_maintenance_tick_count; bool _must_send_updated_host_info; bool _use_legacy_mode; + uint32_t _stripe_index; }; } diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp index 7bebe4c001a..013551b8505 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp @@ -28,13 +28,17 @@ IdealStateManager::IdealStateManager( DistributorStripeInterface& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, - DistributorComponentRegister& compReg) + DistributorComponentRegister& compReg, + uint32_t stripe_index) : _metrics(new IdealStateMetricSet), _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Ideal state manager"), _bucketSpaceRepo(bucketSpaceRepo), _has_logged_phantom_replica_warning(false) { - _distributorComponent.registerMetric(*_metrics); + if (stripe_index == 0) { + // TODO STRIPE: Add proper handling of metrics across distributor stripes + _distributorComponent.registerMetric(*_metrics); + } LOG(debug, "Adding BucketStateStateChecker to state checkers"); _stateCheckers.push_back(StateChecker::SP(new BucketStateStateChecker())); diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h index ab7a64142f6..041e009ee9f 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.h +++ b/storage/src/vespa/storage/distributor/idealstatemanager.h @@ -36,7 +36,8 @@ public: IdealStateManager(DistributorStripeInterface& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, - DistributorComponentRegister& compReg); + DistributorComponentRegister& compReg, + uint32_t stripe_index = 0); ~IdealStateManager() override; |