diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-04-22 09:18:24 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-04-22 09:18:24 +0000 |
commit | ef07869d1741e5a6aad3301bf5496fa2c61b5964 (patch) | |
tree | a4474aca2efb720000c5f3eec9e822e2572170e2 /storage | |
parent | 1f7363ff53144c40fd27c4332b1cb3619b1525d6 (diff) |
Decouple DistributorStripe from StorageLink.
Diffstat (limited to 'storage')
3 files changed, 16 insertions, 33 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 0f63de5cd85..b29d8458b41 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -179,7 +179,6 @@ void Distributor::onOpen() { LOG(debug, "Distributor::onOpen invoked"); - _stripe->open(); setNodeStateUp(); framework::MilliSecTime maxProcessingTime(60 * 1000); framework::MilliSecTime waitTime(1000); @@ -195,7 +194,7 @@ Distributor::onOpen() void Distributor::onClose() { LOG(debug, "Distributor::onClose invoked"); - _stripe->close(); + _stripe->flush_and_close(); if (_bucket_db_updater) { _bucket_db_updater->flush(); } @@ -249,7 +248,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) } // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone? // that covers most operations already... - return _stripe->onDown(msg); + return _stripe->handle_or_enqueue_message(msg); } bool @@ -297,7 +296,7 @@ Distributor::storageDistributionChanged() } } else { // May happen from any thread. - _stripe->storageDistributionChanged(); + _stripe->storage_distribution_changed(); } } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 906129ebf96..11fb69a30c1 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -40,8 +40,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, DoneInitializeHandler& doneInitHandler, bool manageActiveBucketCopies, ChainedMessageSender& messageSender) - : StorageLink("distributor"), - DistributorStripeInterface(), + : DistributorStripeInterface(), framework::StatusReporter("distributor", "Distributor"), _clusterStateBundle(lib::ClusterState()), _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())), @@ -114,36 +113,23 @@ DistributorStripe::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) api::MergeBucketCommand& merge(static_cast<api::MergeBucketCommand&>(*cmd)); _idealStateManager.getMetrics().nodesPerMerge.addValue(merge.getNodes().size()); } - sendUp(cmd); + send_up_with_tracking(cmd); } void DistributorStripe::sendReply(const std::shared_ptr<api::StorageReply>& reply) { - sendUp(reply); -} - -void -DistributorStripe::onOpen() -{ - LOG(debug, "DistributorStripe::onOpen invoked"); - if (_component.getDistributorConfig().startDistributorThread) { - // TODO STRIPE own thread per stripe! - } else { - LOG(warning, "Not starting distributor stripe thread as it's not configured to " - "run. Unless you are just running a test tool, this is a " - "fatal error."); - } + send_up_with_tracking(reply); } void DistributorStripe::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) { api::StorageReply::UP reply( std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down")); - sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); + send_up_with_tracking(std::shared_ptr<api::StorageMessage>(reply.release())); } -void DistributorStripe::onClose() { +void DistributorStripe::flush_and_close() { for (auto& msg : _messageQueue) { if (!msg->getType().isReply()) { send_shutdown_abort_reply(msg); @@ -168,14 +154,14 @@ void DistributorStripe::send_up_without_tracking(const std::shared_ptr<api::Stor } void -DistributorStripe::sendUp(const std::shared_ptr<api::StorageMessage>& msg) +DistributorStripe::send_up_with_tracking(const std::shared_ptr<api::StorageMessage>& msg) { _pendingMessageTracker.insert(msg); send_up_without_tracking(msg); } bool -DistributorStripe::onDown(const std::shared_ptr<api::StorageMessage>& msg) +DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageMessage>& msg) { if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) { return true; @@ -400,7 +386,7 @@ void DistributorStripe::invalidate_bucket_spaces_stats() { } void -DistributorStripe::storageDistributionChanged() +DistributorStripe::storage_distribution_changed() { if (!_distribution.get() || *_component.getDistribution() != *_distribution) diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 4a43b93354e..9756547ec1d 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -45,8 +45,7 @@ class ThrottlingOperationStarter; * TODO STRIPE add class comment. */ class DistributorStripe final - : public StorageLink, // TODO decouple - public DistributorStripeInterface, + : public DistributorStripeInterface, public StatusDelegator, public framework::StatusReporter, public framework::TickingThread, @@ -68,10 +67,9 @@ public: const ClusterContext& cluster_context() const override { return _component.cluster_context(); } - void onOpen() override; - void onClose() override; - bool onDown(const std::shared_ptr<api::StorageMessage>&) override; - void sendUp(const std::shared_ptr<api::StorageMessage>&) override; + void flush_and_close(); + bool handle_or_enqueue_message(const std::shared_ptr<api::StorageMessage>&); + void send_up_with_tracking(const std::shared_ptr<api::StorageMessage>&); // Bypasses message tracker component. Thread safe. void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override; @@ -105,7 +103,7 @@ public: */ void notifyDistributionChangeEnabled() override; - void storageDistributionChanged() override; + void storage_distribution_changed(); void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override; |