summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp66
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp24
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h4
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h3
6 files changed, 83 insertions, 23 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index d85a4ff41d1..368a25315fb 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -23,6 +23,7 @@
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/config/distributorconfiguration.h>
#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
+#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/memoryusage.h>
@@ -60,6 +61,7 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_metrics(std::make_shared<DistributorMetricSet>()),
_messageSender(messageSender),
_use_legacy_mode(num_distributor_stripes == 0),
+ _n_stripe_bits(0),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
doneInitHandler, *this, *this, _use_legacy_mode)),
_stripe_pool(stripe_pool),
@@ -90,13 +92,19 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0));
if (!_use_legacy_mode) {
assert(num_distributor_stripes == adjusted_num_stripes(num_distributor_stripes));
- LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone
+ _n_stripe_bits = calc_num_stripe_bits(num_distributor_stripes);
+ LOG(info, "Setting up distributor with %u stripes using %u stripe bits",
+ num_distributor_stripes, _n_stripe_bits); // TODO STRIPE remove once legacy gone
_stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(_stripe_pool);
_bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component,
*this, *this,
_component.getDistribution(),
*_stripe_accessor);
_stripes.emplace_back(std::move(_stripe));
+ for (size_t i = 1; i < num_distributor_stripes; ++i) {
+ _stripes.emplace_back(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool,
+ doneInitHandler, *this, *this, _use_legacy_mode, i));
+ }
_stripe_scan_stats.resize(num_distributor_stripes);
_distributorStatusDelegate.registerStatusPage();
_bucket_db_status_delegate = std::make_unique<StatusReporterDelegate>(compReg, *this, *_bucket_db_updater);
@@ -116,14 +124,12 @@ Distributor::~Distributor()
// TODO STRIPE remove
DistributorStripe&
Distributor::first_stripe() noexcept {
- assert(_stripes.size() == 1);
return *_stripes[0];
}
// TODO STRIPE remove
const DistributorStripe&
Distributor::first_stripe() const noexcept {
- assert(_stripes.size() == 1);
return *_stripes[0];
}
@@ -254,7 +260,10 @@ Distributor::onOpen()
_threadPool.addThread(*this);
_threadPool.start(_component.getThreadPool());
if (!_use_legacy_mode) {
- std::vector<TickableStripe*> pool_stripes({_stripes[0].get()});
+ std::vector<TickableStripe*> pool_stripes;
+ for (auto& stripe : _stripes) {
+ pool_stripes.push_back(stripe.get());
+ }
_stripe_pool.start(pool_stripes);
}
} else {
@@ -321,6 +330,43 @@ bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage&
}
}
+document::BucketId
+get_bucket_id_for_striping(const api::StorageMessage& msg, const DistributorNodeContext& node_ctx)
+{
+ if (!msg.getBucketId().isSet()) {
+ // Calculate a bucket id (dependent on the message type) to dispatch the message to the correct distributor stripe.
+ switch (msg.getType().getId()) {
+ case api::MessageType::PUT_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::REMOVE_ID:
+ return node_ctx.bucket_id_factory().getBucketId(dynamic_cast<const api::TestAndSetCommand&>(msg).getDocumentId());
+ case api::MessageType::REQUESTBUCKETINFO_REPLY_ID:
+ {
+ const auto& reply = dynamic_cast<const api::RequestBucketInfoReply&>(msg);
+ if (!reply.getBucketInfo().empty()) {
+ // Note: All bucket ids in this reply belong to the same distributor stripe, so we just use the first entry.
+ return reply.getBucketInfo()[0]._bucketId;
+ } else {
+ return reply.getBucketId();
+ }
+ }
+ default:
+ return msg.getBucketId();
+ }
+ }
+ return msg.getBucketId();
+}
+
+uint32_t
+stripe_of_bucket_id(const document::BucketId& bucketd_id, uint8_t n_stripe_bits)
+{
+ if (!bucketd_id.isSet()) {
+ // TODO STRIPE: Messages with a non-set bucket id should be handled by the top-level distributor instead.
+ return 0;
+ }
+ return storage::stripe_of_bucket_key(bucketd_id.toKey(), n_stripe_bits);
+}
+
}
bool
@@ -335,12 +381,13 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
dispatch_to_main_distributor_thread_queue(msg);
return true;
}
- assert(_stripes.size() == 1);
- assert(_stripe_pool.stripe_count() == 1);
- // TODO STRIPE correct routing with multiple stripes
- bool handled = first_stripe().handle_or_enqueue_message(msg);
+ auto bucket_id = get_bucket_id_for_striping(*msg, _component);
+ uint32_t stripe_idx = stripe_of_bucket_id(bucket_id, _n_stripe_bits);
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("Distributor::onDown(): Dispatch message to stripe %u", stripe_idx));
+ bool handled = _stripes[stripe_idx]->handle_or_enqueue_message(msg);
if (handled) {
- _stripe_pool.stripe_thread(0).notify_event_has_triggered();
+ _stripe_pool.stripe_thread(stripe_idx).notify_event_has_triggered();
}
return handled;
}
@@ -442,7 +489,6 @@ Distributor::propagateDefaultDistribution(
} else {
// Should only be called at ctor time, at which point the pool is not yet running.
assert(_stripe_pool.stripe_count() == 0);
- assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes
auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
for (auto& stripe : _stripes) {
stripe->update_distribution_config(new_configs);
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 50bd2526ff4..61a1f06309d 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -201,6 +201,7 @@ private:
ChainedMessageSender* _messageSender;
const bool _use_legacy_mode;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
+ uint8_t _n_stripe_bits;
std::unique_ptr<DistributorStripe> _stripe;
DistributorStripePool& _stripe_pool;
std::vector<std::unique_ptr<DistributorStripe>> _stripes;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 5c6c529fe69..bf78707cfd9 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -41,7 +41,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
DoneInitializeHandler& doneInitHandler,
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
- bool use_legacy_mode)
+ bool use_legacy_mode,
+ uint32_t stripe_index)
: DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
_clusterStateBundle(lib::ClusterState()),
@@ -57,7 +58,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_bucketDBUpdater(_component, _component, *this, *this, use_legacy_mode),
_distributorStatusDelegate(compReg, *this, *this),
_bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater),
- _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg),
+ _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, stripe_index),
_messageSender(messageSender),
_stripe_host_info_notifier(stripe_host_info_notifier),
_externalOperationHandler(_component, _component, getMetrics(), getMessageSender(),
@@ -86,7 +87,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_last_db_memory_sample_time_point(),
_inhibited_maintenance_tick_count(0),
_must_send_updated_host_info(false),
- _use_legacy_mode(use_legacy_mode)
+ _use_legacy_mode(use_legacy_mode),
+ _stripe_index(stripe_index)
{
if (use_legacy_mode) {
_distributorStatusDelegate.registerStatusPage();
@@ -176,7 +178,7 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM
return true;
}
MBUS_TRACE(msg->getTrace(), 9,
- "Distributor: Added to message queue. Thread state: "
+ vespalib::make_string("DistributorStripe[%u]: Added to message queue. Thread state: ", _stripe_index)
+ _threadPool.getStatus());
if (_use_legacy_mode) {
// TODO STRIPE remove
@@ -570,7 +572,8 @@ bool is_client_request(const api::StorageMessage& msg) noexcept {
void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) {
if (!handleMessage(msg)) {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down.");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Not handling it. Sending further down", _stripe_index));
_messageSender.sendDown(msg);
}
}
@@ -578,10 +581,12 @@ void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::S
void DistributorStripe::startExternalOperations() {
for (auto& msg : _fetchedMessages) {
if (is_client_request(*msg)) {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Adding to client request priority queue", _stripe_index));
_client_request_priority_queue.emplace(std::move(msg));
} else {
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed.");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Grabbed from queue to be processed", _stripe_index));
handle_or_propagate_message(msg);
}
}
@@ -589,8 +594,9 @@ void DistributorStripe::startExternalOperations() {
const bool start_single_client_request = !_client_request_priority_queue.empty();
if (start_single_client_request) {
const auto& msg = _client_request_priority_queue.top();
- MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from "
- "client request priority queue to be processed.");
+ MBUS_TRACE(msg->getTrace(), 9,
+ vespalib::make_string("DistributorStripe[%u]: Grabbed from "
+ "client request priority queue to be processed", _stripe_index));
handle_or_propagate_message(msg);
_client_request_priority_queue.pop();
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index b82b5483bd3..347863b6d77 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -64,7 +64,8 @@ public:
DoneInitializeHandler&,
ChainedMessageSender& messageSender,
StripeHostInfoNotifier& stripe_host_info_notifier,
- bool use_legacy_mode);
+ bool use_legacy_mode,
+ uint32_t stripe_index = 0);
~DistributorStripe() override;
@@ -363,6 +364,7 @@ private:
size_t _inhibited_maintenance_tick_count;
bool _must_send_updated_host_info;
bool _use_legacy_mode;
+ uint32_t _stripe_index;
};
}
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 7bebe4c001a..013551b8505 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -28,13 +28,17 @@ IdealStateManager::IdealStateManager(
DistributorStripeInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
- DistributorComponentRegister& compReg)
+ DistributorComponentRegister& compReg,
+ uint32_t stripe_index)
: _metrics(new IdealStateMetricSet),
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Ideal state manager"),
_bucketSpaceRepo(bucketSpaceRepo),
_has_logged_phantom_replica_warning(false)
{
- _distributorComponent.registerMetric(*_metrics);
+ if (stripe_index == 0) {
+ // TODO STRIPE: Add proper handling of metrics across distributor stripes
+ _distributorComponent.registerMetric(*_metrics);
+ }
LOG(debug, "Adding BucketStateStateChecker to state checkers");
_stateCheckers.push_back(StateChecker::SP(new BucketStateStateChecker()));
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index ab7a64142f6..041e009ee9f 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -36,7 +36,8 @@ public:
IdealStateManager(DistributorStripeInterface& owner,
DistributorBucketSpaceRepo& bucketSpaceRepo,
DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
- DistributorComponentRegister& compReg);
+ DistributorComponentRegister& compReg,
+ uint32_t stripe_index = 0);
~IdealStateManager() override;