summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-04-22 09:18:24 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-04-22 09:18:24 +0000
commitef07869d1741e5a6aad3301bf5496fa2c61b5964 (patch)
treea4474aca2efb720000c5f3eec9e822e2572170e2 /storage
parent1f7363ff53144c40fd27c4332b1cb3619b1525d6 (diff)
Decouple DistributorStripe from StorageLink.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp30
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h12
3 files changed, 16 insertions, 33 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 0f63de5cd85..b29d8458b41 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -179,7 +179,6 @@ void
Distributor::onOpen()
{
LOG(debug, "Distributor::onOpen invoked");
- _stripe->open();
setNodeStateUp();
framework::MilliSecTime maxProcessingTime(60 * 1000);
framework::MilliSecTime waitTime(1000);
@@ -195,7 +194,7 @@ Distributor::onOpen()
void Distributor::onClose() {
LOG(debug, "Distributor::onClose invoked");
- _stripe->close();
+ _stripe->flush_and_close();
if (_bucket_db_updater) {
_bucket_db_updater->flush();
}
@@ -249,7 +248,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
}
// TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone?
// that covers most operations already...
- return _stripe->onDown(msg);
+ return _stripe->handle_or_enqueue_message(msg);
}
bool
@@ -297,7 +296,7 @@ Distributor::storageDistributionChanged()
}
} else {
// May happen from any thread.
- _stripe->storageDistributionChanged();
+ _stripe->storage_distribution_changed();
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 906129ebf96..11fb69a30c1 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -40,8 +40,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
DoneInitializeHandler& doneInitHandler,
bool manageActiveBucketCopies,
ChainedMessageSender& messageSender)
- : StorageLink("distributor"),
- DistributorStripeInterface(),
+ : DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
_clusterStateBundle(lib::ClusterState()),
_bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
@@ -114,36 +113,23 @@ DistributorStripe::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd)
api::MergeBucketCommand& merge(static_cast<api::MergeBucketCommand&>(*cmd));
_idealStateManager.getMetrics().nodesPerMerge.addValue(merge.getNodes().size());
}
- sendUp(cmd);
+ send_up_with_tracking(cmd);
}
void
DistributorStripe::sendReply(const std::shared_ptr<api::StorageReply>& reply)
{
- sendUp(reply);
-}
-
-void
-DistributorStripe::onOpen()
-{
- LOG(debug, "DistributorStripe::onOpen invoked");
- if (_component.getDistributorConfig().startDistributorThread) {
- // TODO STRIPE own thread per stripe!
- } else {
- LOG(warning, "Not starting distributor stripe thread as it's not configured to "
- "run. Unless you are just running a test tool, this is a "
- "fatal error.");
- }
+ send_up_with_tracking(reply);
}
void DistributorStripe::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) {
api::StorageReply::UP reply(
std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down"));
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+ send_up_with_tracking(std::shared_ptr<api::StorageMessage>(reply.release()));
}
-void DistributorStripe::onClose() {
+void DistributorStripe::flush_and_close() {
for (auto& msg : _messageQueue) {
if (!msg->getType().isReply()) {
send_shutdown_abort_reply(msg);
@@ -168,14 +154,14 @@ void DistributorStripe::send_up_without_tracking(const std::shared_ptr<api::Stor
}
void
-DistributorStripe::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
+DistributorStripe::send_up_with_tracking(const std::shared_ptr<api::StorageMessage>& msg)
{
_pendingMessageTracker.insert(msg);
send_up_without_tracking(msg);
}
bool
-DistributorStripe::onDown(const std::shared_ptr<api::StorageMessage>& msg)
+DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageMessage>& msg)
{
if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) {
return true;
@@ -400,7 +386,7 @@ void DistributorStripe::invalidate_bucket_spaces_stats() {
}
void
-DistributorStripe::storageDistributionChanged()
+DistributorStripe::storage_distribution_changed()
{
if (!_distribution.get()
|| *_component.getDistribution() != *_distribution)
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 4a43b93354e..9756547ec1d 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -45,8 +45,7 @@ class ThrottlingOperationStarter;
* TODO STRIPE add class comment.
*/
class DistributorStripe final
- : public StorageLink, // TODO decouple
- public DistributorStripeInterface,
+ : public DistributorStripeInterface,
public StatusDelegator,
public framework::StatusReporter,
public framework::TickingThread,
@@ -68,10 +67,9 @@ public:
const ClusterContext& cluster_context() const override {
return _component.cluster_context();
}
- void onOpen() override;
- void onClose() override;
- bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
- void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
+ void flush_and_close();
+ bool handle_or_enqueue_message(const std::shared_ptr<api::StorageMessage>&);
+ void send_up_with_tracking(const std::shared_ptr<api::StorageMessage>&);
// Bypasses message tracker component. Thread safe.
void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override;
@@ -105,7 +103,7 @@ public:
*/
void notifyDistributionChangeEnabled() override;
- void storageDistributionChanged() override;
+ void storage_distribution_changed();
void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override;