summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-06-01 14:42:53 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-06-01 14:42:53 +0000
commit3f0c29a9e6dcef1bd04a5d7c9d3114a1983a3d46 (patch)
tree049ad030e3d7b7e2609e13196ea266db2f738214 /storage
parent8749e1d6e55ac4490a5763cc37c4094d3714a53b (diff)
Add proof of concept support for multiple distributor stripes.
The most basic functionality is now supported using multiple distributor stripes (and threads). Note that the following is (at least) still missing: * Stripe-separate metrics with top-level aggregation. * Aggregation over all stripes in misc functions in Distributor that currently is using the first stripe. * Handling of messages without bucket id in the top-level Distributor instead of using the first stripe.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp66
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp24
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h3
6 files changed, 83 insertions, 23 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index d85a4ff41d1..368a25315fb 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -23,6 +23,7 @@
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/config/distributorconfiguration.h>
#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
+#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/memoryusage.h>
@@ -60,6 +61,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_metrics(std::make_shared<DistributorMetricSet>()),
_messageSender(messageSender),
_use_legacy_mode(num_distributor_stripes == 0),
+ _n_stripe_bits(0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode)),
_stripe_pool(stripe_pool),
@@ -90,13 +92,19 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes));
- LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
+ _n_stripe_bits = calc_num_stripe_bits(num_distributor_stripes);
+ LOG(info, "Setting up distributor with %u stripes using %u stripe bits",
+ num_distributor_stripes, _n_stripe_bits); // TODO STRIPE remove once legacy gone
_stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool);
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
*_stripe_accessor);
_stripes.emplace_back(std::move(_stripe));
+ for (size_t i = 1; i < num_distributor_stripes; ++i) {
+ _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
+ doneInitHandler, *this, *this, _use_legacy_mode, i));
+ }
_stripe_scan_stats.resize(num_distributor_stripes);
_distributorStatusDelegate.registerStatusPage();
_bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater);
@@ -116,14 +124,12 @@ Distributor::~Distributor()
// TODO STRIPE remove
DistributorStripe&
Distributor::first_stripe() noexcept {
- assert(_stripes.size() == 1);
return *_stripes[0];
}
// TODO STRIPE remove
const DistributorStripe&
Distributor::first_stripe() const noexcept {
- assert(_stripes.size() == 1);
return *_stripes[0];
}
@@ -254,7 +260,10 @@ Distributor::onOpen()
_threadPool.addThread(*this);
_threadPool.start(_component.getThreadPool());
if (!_use_legacy_mode) {
- std::vector<TickableStripe*> pool_stripes({_stripes[0].get()});
+ std::vector<TickableStripe*> pool_stripes;
+ for (auto& stripe : _stripes) {
+ pool_stripes.push_back(stripe.get());
+ }
_stripe_pool.start(pool_stripes);
}
} else {
@@ -321,6 +330,43 @@ bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage&
}
}
+document::BucketId
+get_bucket_id_for_striping(const api::StorageMessage& msg, const DistributorNodeContext& node_ctx)
+{
+ if (!msg.getBucketId().isSet()) {
+ // Calculate a bucket id (dependent on the message type) to dispatch the message to the correct distributor stripe.
+ switch (msg.getType().getId()) {
+ case api::MessageType::PUT_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::REMOVE_ID:
+ return node_ctx.bucket_id_factory().getBucketId(dynamic_cast<const api::TestAndSetCommand&>(msg).getDocumentId());
+ case api::MessageType::REQUESTBUCKETINFO_REPLY_ID:
+ {
+ const auto& reply = dynamic_cast<const api::RequestBucketInfoReply&>(msg);
+ if (!reply.getBucketInfo().empty()) {
+ // Note: All bucket ids in this reply belong to the same distributor stripe, so we just use the first entry.
+ return reply.getBucketInfo()[0]._bucketId;
+ } else {
+ return reply.getBucketId();
+ }
+ }
+ default:
+ return msg.getBucketId();
+ }
+ }
+ return msg.getBucketId();
+}
+
+uint32_t
+stripe_of_bucket_id(const document::BucketId& bucketd_id, uint8_t n_stripe_bits)
+{
+ if (!bucketd_id.isSet()) {
+ // TODO STRIPE: Messages with a non-set bucket id should be handled by the top-level distributor instead.
+ return 0;
+ }
+ return storage::stripe_of_bucket_key(bucketd_id.toKey(), n_stripe_bits);
+}
+
}
bool
@@ -335,12 +381,13 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
dispatch_to_main_distributor_thread_queue(msg);
return true;
}
- assert(_stripes.size() == 1);
- assert(_stripe_pool.stripe_count() == 1);
- // TODO STRIPE correct routing with multiple stripes
- bool handled = first_stripe().handle_or_enqueue_message(msg);
+ auto bucket_id = get_bucket_id_for_striping(*msg, _component);
+ uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, _n_stripe_bits);
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx));
+ bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg);
if (handled) {
- _stripe_pool.stripe_thread(0).notify_event_has_triggered();
+ _stripe_pool.stripe_thread(stripe_idx).notify_event_has_triggered();
}
return handled;
}
@@ -442,7 +489,6 @@ Distributor::propagateDefaultDistribution(
} else {
// Should only be called at ctor time, at which point the pool is not yet running.
assert(_stripe_pool.stripe_count() == 0);
- assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes
auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
for (auto& stripe : _stripes) {
stripe->update_distribution_config(new_configs);
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 50bd2526ff4..61a1f06309d 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -201,6 +201,7 @@ private:
ChainedMessageSender* _messageSender;
const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
+ uint8_t _n_stripe_bits;
std::unique_ptr<DistributorStripe> _stripe;
DistributorStripePool& _stripe_pool;
std::vector<std::unique_ptr<DistributorStripe>> _stripes;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 5c6c529fe69..bf78707cfd9 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -41,7 +41,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
DoneInitializeHandler& doneInitHandler,
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
- bool use_legacy_mode)
+ bool use_legacy_mode,
+ uint32_t stripe_index)
: DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
_clusterStateBundle(lib::ClusterState()),
@@ -57,7 +58,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_bucketDBUpdater(_component, _component, *this, *this, use_legacy_mode),
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
- _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg),
+ _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, stripe_index),
_messageSender(messageSender),
_stripe_host_info_notifier(stripe_host_info_notifier),
_externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
@@ -86,7 +87,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_last_db_memory_sample_time_point(),
_inhibited_maintenance_tick_count(0),
_must_send_updated_host_info(false),
- _use_legacy_mode(use_legacy_mode)
+ _use_legacy_mode(use_legacy_mode),
+ _stripe_index(stripe_index)
{
if (use_legacy_mode) {
_distributorStatusDelegate.registerStatusPage();
@@ -176,7 +178,7 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM
return true;
}
MBUS_TRACE(msg->getTrace(), 9,
- "Distributor: Added to message queue. Thread state: "
+ vespalib::make_string("DistributorStripe[%u]: Added to message queue. Thread state: ", _stripe_index)
+ _threadPool.getStatus());
if (_use_legacy_mode) {
// TODO STRIPE remove
@@ -570,7 +572,8 @@ bool is_client_request(const api::StorageMessage& msg) noexcept {
void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) {
if (!handleMessage(msg)) {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down.");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Not handling it. Sending further down", _stripe_index));
_messageSender.sendDown(msg);
}
}
@@ -578,10 +581,12 @@ void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::S
void DistributorStripe::startExternalOperations() {
for (auto& msg : _fetchedMessages) {
if (is_client_request(*msg)) {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Adding to client request priority queue", _stripe_index));
_client_request_priority_queue.emplace(std::move(msg));
} else {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed.");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Grabbed from queue to be processed", _stripe_index));
handle_or_propagate_message(msg);
}
}
@@ -589,8 +594,9 @@ void DistributorStripe::startExternalOperations() {
const bool start_single_client_request = !_client_request_priority_queue.empty();
if (start_single_client_request) {
const auto& msg = _client_request_priority_queue.top();
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from "
- "client request priority queue to be processed.");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Grabbed from "
+ "client request priority queue to be processed", _stripe_index));
handle_or_propagate_message(msg);
_client_request_priority_queue.pop();
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index b82b5483bd3..347863b6d77 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -64,7 +64,8 @@ public:
DoneInitializeHandler&,
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
- bool use_legacy_mode);
+ bool use_legacy_mode,
+ uint32_t stripe_index = 0);
~DistributorStripe() override;
@@ -363,6 +364,7 @@ private:
size_t _inhibited_maintenance_tick_count;
bool _must_send_updated_host_info;
bool _use_legacy_mode;
+ uint32_t _stripe_index;
};
}
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 7bebe4c001a..013551b8505 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -28,13 +28,17 @@ IdealStateManager::IdealStateManager(
DistributorStripeInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
- DistributorComponentRegister& compReg)
+ DistributorComponentRegister& compReg,
+ uint32_t stripe_index)
: _metrics(new IdealStateMetricSet),
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Ideal state manager"),
_bucketSpaceRepo(bucketSpaceRepo),
_has_logged_phantom_replica_warning(false)
{
- _distributorComponent.registerMetric(*_metrics);
+ if (stripe_index == 0) {
+ // TODO STRIPE: Add proper handling of metrics across distributor stripes
+ _distributorComponent.registerMetric(*_metrics);
+ }
LOG(debug, "Adding BucketStateStateChecker to state checkers");
_stateCheckers.push_back(StateChecker::SP(new BucketStateStateChecker()));
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index ab7a64142f6..041e009ee9f 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -36,7 +36,8 @@ public:
IdealStateManager(DistributorStripeInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
- DistributorComponentRegister& compReg);
+ DistributorComponentRegister& compReg,
+ uint32_t stripe_index = 0);
~IdealStateManager() override;