summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-03-23 12:40:34 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-03-23 12:40:34 +0000
commitb953eaa236aabf8bc883a41fb5345272200dccf4 (patch)
tree33ec782938e9276d95ae237aa866fd2de01d7f9a /storage
parent50bec9bb93ec7502313decc1c82fd41efdfac06b (diff)
Simplify Distributor class to remove now unused functionality
In particular, remove `DistributorStripeInterface` inheritance and subclassed `DistributorComponent` usage.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp133
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h132
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h17
4 files changed, 25 insertions, 258 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 29af083cf4e..f886640d1f9 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -46,22 +46,16 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
HostInfo& hostInfoReporterRegistrar,
ChainedMessageSender* messageSender)
: StorageLink("distributor"),
- DistributorStripeInterface(),
framework::StatusReporter("distributor", "Distributor"),
_metrics(std::make_shared<DistributorMetricSet>()),
_messageSender(messageSender),
_stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler,
manageActiveBucketCopies, *this)),
- // TODO STRIPE remove once DistributorStripeComponent no longer references bucket space repos
- _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
- _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())),
- // TODO STRIPE slim down
- _component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"),
+ _component(compReg, "distributor"),
_distributorStatusDelegate(compReg, *this, *this),
_threadPool(threadPool),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_metricUpdateHook(*this),
- _metricLock(),
_hostInfoReporter(*this, *this)
{
_component.registerMetric(*_metrics);
@@ -82,11 +76,6 @@ Distributor::isInRecoveryMode() const noexcept {
return _stripe->isInRecoveryMode();
}
-int
-Distributor::getDistributorIndex() const {
- return _component.getIndex();
-}
-
const PendingMessageTracker&
Distributor::getPendingMessageTracker() const {
return _stripe->getPendingMessageTracker();
@@ -169,27 +158,6 @@ Distributor::db_memory_sample_interval() const noexcept {
return _stripe->db_memory_sample_interval();
}
-bool Distributor::initializing() const {
- return _stripe->initializing();
-}
-
-const lib::ClusterState*
-Distributor::pendingClusterStateOrNull(const document::BucketSpace& space) const {
- return bucket_db_updater().pendingClusterStateOrNull(space);
-}
-
-void
-Distributor::sendCommand(const std::shared_ptr<api::StorageCommand>&)
-{
- assert(false); // TODO STRIPE
-}
-
-void
-Distributor::sendReply(const std::shared_ptr<api::StorageReply>&)
-{
- assert(false); // TODO STRIPE
-}
-
void
Distributor::setNodeStateUp()
{
@@ -217,22 +185,11 @@ Distributor::onOpen()
}
}
-void Distributor::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) {
- api::StorageReply::UP reply(
- std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down"));
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
-}
-
void Distributor::onClose() {
LOG(debug, "Distributor::onClose invoked");
_stripe->close();
}
-void Distributor::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) {
- assert(false);
-}
-
void
Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg)
{
@@ -259,19 +216,6 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
return _stripe->onDown(msg);
}
-void
-Distributor::handleCompletedMerge(
- const std::shared_ptr<api::MergeBucketReply>&)
-{
- assert(false);
-}
-
-bool
-Distributor::isMaintenanceReply(const api::StorageReply&) const
-{
- assert(false);
-}
-
bool
Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
@@ -298,16 +242,6 @@ Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state)
_stripe->enableClusterStateBundle(state);
}
-OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket&) const {
- abort();
-}
-
-void
-Distributor::notifyDistributionChangeEnabled()
-{
- _stripe->notifyDistributionChangeEnabled();
-}
-
void
Distributor::storageDistributionChanged()
{
@@ -316,43 +250,6 @@ Distributor::storageDistributionChanged()
}
void
-Distributor::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) {
- bucket_db_updater().recheckBucketInfo(nodeIdx, bucket);
-}
-
-namespace {
-
-class SplitChecker : public PendingMessageTracker::Checker
-{
-public:
- bool found;
- uint8_t maxPri;
-
- SplitChecker(uint8_t maxP) : found(false), maxPri(maxP) {};
-
- bool check(uint32_t msgType, uint16_t node, uint8_t pri) override {
- (void) node;
- (void) pri;
- if (msgType == api::MessageType::SPLITBUCKET_ID && pri <= maxPri) {
- found = true;
- return false;
- }
-
- return true;
- }
-};
-
-}
-
-void
-Distributor::checkBucketForSplit(document::BucketSpace,
- const BucketDatabase::Entry&,
- uint8_t)
-{
- assert(false);
-}
-
-void
Distributor::enableNextDistribution()
{
_stripe->enableNextDistribution();
@@ -366,24 +263,6 @@ Distributor::propagateDefaultDistribution(
_stripe->propagateDefaultDistribution(std::move(distribution));
}
-void
-Distributor::propagateClusterStates()
-{
- assert(false);
-}
-
-void
-Distributor::signalWorkWasDone()
-{
- _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED;
-}
-
-bool
-Distributor::workWasDone() const noexcept
-{
- return !_tickResult.waitWanted();
-}
-
std::unordered_map<uint16_t, uint32_t>
Distributor::getMinReplica() const
{
@@ -410,10 +289,6 @@ Distributor::propagateInternalScanMetricsToExternal()
_stripe->propagateInternalScanMetricsToExternal();
}
-void Distributor::maybe_update_bucket_db_memory_usage_stats() {
- assert(false);
-}
-
void
Distributor::scanAllBuckets()
{
@@ -447,12 +322,6 @@ Distributor::enableNextConfig()
_stripe->enableNextConfig(); // TODO STRIPE avoid redundant call
}
-void
-Distributor::handleStatusRequests()
-{
- assert(false);
-}
-
vespalib::string
Distributor::getReportContentType(const framework::HttpUrlPath& path) const
{
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 261343b83fd..bfffe126b44 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -12,10 +12,10 @@
#include "pendingmessagetracker.h"
#include "statusreporterdelegate.h"
#include <vespa/config/config.h>
+#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h>
-#include <vespa/storage/distributor/distributor_stripe_component.h>
#include <vespa/storage/distributor/maintenance/maintenancescheduler.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
@@ -41,14 +41,13 @@ class OwnershipTransferSafeTimePointCalculator;
class SimpleMaintenanceScanner;
class ThrottlingOperationStarter;
-class Distributor : public StorageLink,
- public DistributorStripeInterface,
- public StatusDelegator,
- public framework::StatusReporter,
- public framework::TickingThread,
- public MinReplicaProvider,
- public BucketSpacesStatsProvider,
- public NonTrackingMessageSender
+class Distributor final
+ : public StorageLink,
+ public StatusDelegator,
+ public framework::StatusReporter,
+ public framework::TickingThread,
+ public MinReplicaProvider,
+ public BucketSpacesStatsProvider
{
public:
Distributor(DistributorComponentRegister&,
@@ -61,7 +60,7 @@ public:
~Distributor() override;
- const ClusterContext& cluster_context() const override {
+ const ClusterContext& cluster_context() const {
return _component.cluster_context();
}
void onOpen() override;
@@ -69,40 +68,18 @@ public:
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
void sendUp(const std::shared_ptr<api::StorageMessage>&) override;
void sendDown(const std::shared_ptr<api::StorageMessage>&) override;
- // Bypasses message tracker component. Thread safe.
- void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override;
- ChainedMessageSender& getMessageSender() override {
- abort(); // TODO STRIPE
- }
-
- DistributorMetricSet& getMetrics() override { return *_metrics; }
-
- const OperationSequencer& operation_sequencer() const noexcept override {
- abort(); // TODO STRIPE
- }
-
- const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override;
+ DistributorMetricSet& getMetrics() { return *_metrics; }
/**
* Enables a new cluster state. Called after the bucket db updater has
* retrieved all bucket info related to the change.
*/
- void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle) override;
-
- /**
- * Invoked when a pending cluster state for a distribution (config)
- * change has been enabled. An invocation of storageDistributionChanged
- * will eventually cause this method to be called, assuming the pending
- * cluster state completed successfully.
- */
- void notifyDistributionChangeEnabled() override;
+ void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle);
void storageDistributionChanged() override;
- void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override;
-
- bool handleReply(const std::shared_ptr<api::StorageReply>& reply) override;
+ bool handleReply(const std::shared_ptr<api::StorageReply>& reply);
// StatusReporter implementation
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
@@ -115,40 +92,13 @@ public:
virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override;
virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override;
- /**
- * Checks whether a bucket needs to be split, and sends a split
- * if so.
- */
- void checkBucketForSplit(document::BucketSpace bucketSpace,
- const BucketDatabase::Entry& e,
- uint8_t priority) override;
-
- const lib::ClusterStateBundle& getClusterStateBundle() const override;
-
- /**
- * @return Returns the states in which the distributors consider
- * storage nodes to be up.
- */
- const char* getStorageNodeUpStates() const override {
- return "uri";
- }
-
- /**
- * Called by bucket db updater after a merge has finished, and all the
- * request bucket info operations have been performed as well. Passes the
- * merge back to the operation that created it.
- */
- void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override;
-
- bool initializing() const override;
-
- const DistributorConfiguration& getConfig() const override;
+ const lib::ClusterStateBundle& getClusterStateBundle() const;
+ const DistributorConfiguration& getConfig() const;
bool isInRecoveryMode() const noexcept;
- int getDistributorIndex() const override;
- PendingMessageTracker& getPendingMessageTracker() override;
- const PendingMessageTracker& getPendingMessageTracker() const override;
+ PendingMessageTracker& getPendingMessageTracker();
+ const PendingMessageTracker& getPendingMessageTracker() const;
DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept;
const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept;
@@ -157,16 +107,6 @@ public:
storage::distributor::DistributorStripeComponent& distributor_component() noexcept;
- void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
- void sendReply(const std::shared_ptr<api::StorageReply>&) override;
-
- const BucketGcTimeCalculator::BucketIdHasher&
- getBucketIdHasher() const override {
- abort(); // TODO STRIPE
- }
-
- OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override;
-
class MetricUpdateHook : public framework::MetricUpdateHook
{
public:
@@ -193,12 +133,6 @@ private:
void setNodeStateUp();
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
- bool isMaintenanceReply(const api::StorageReply& reply) const;
-
- void handleStatusRequests();
- void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&);
- void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg);
- void startExternalOperations();
// Accessors used by tests
BucketDBUpdater& bucket_db_updater();
@@ -223,39 +157,21 @@ private:
* Takes metric lock.
*/
void propagateInternalScanMetricsToExternal();
- void maybe_update_bucket_db_memory_usage_stats();
void scanAllBuckets();
void enableNextConfig();
- void signalWorkWasDone();
- bool workWasDone() const noexcept;
-
void enableNextDistribution();
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
- void propagateClusterStates();
std::shared_ptr<DistributorMetricSet> _metrics;
- ChainedMessageSender* _messageSender;
+ ChainedMessageSender* _messageSender;
// TODO STRIPE multiple stripes...! This is for proof of concept of wiring.
- std::unique_ptr<DistributorStripe> _stripe;
-
- std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
- // Read-only bucket space repo with DBs that only contain buckets transiently
- // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo
- // and the DBs are empty during non-transition phases.
- std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo;
- storage::distributor::DistributorStripeComponent _component;
-
- StatusReporterDelegate _distributorStatusDelegate;
-
- framework::TickingThreadPool& _threadPool;
-
- mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo;
- mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests;
-
- framework::ThreadWaitInfo _tickResult;
- MetricUpdateHook _metricUpdateHook;
- mutable std::mutex _metricLock;
- DistributorHostInfoReporter _hostInfoReporter;
+ std::unique_ptr<DistributorStripe> _stripe;
+ storage::DistributorComponent _component;
+ StatusReporterDelegate _distributorStatusDelegate;
+ framework::TickingThreadPool& _threadPool;
+ framework::ThreadWaitInfo _tickResult;
+ MetricUpdateHook _metricUpdateHook;
+ DistributorHostInfoReporter _hostInfoReporter;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 0a3befbbf5c..e41c7940a0d 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -73,7 +73,6 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg,
_recoveryTimeStarted(_component.getClock()),
_tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN),
_bucketIdHasher(std::make_unique<BucketGcTimeCalculator::BucketIdIdentityHasher>()),
- _metricUpdateHook(*this),
_metricLock(),
_maintenanceStats(),
_bucketSpacesStats(),
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index c6ada1f1ad7..10b3f54d834 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -188,22 +188,6 @@ public:
OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override;
- class MetricUpdateHook : public framework::MetricUpdateHook
- {
- public:
- MetricUpdateHook(DistributorStripe& self)
- : _self(self)
- {
- }
-
- void updateMetrics(const MetricLockGuard &) override {
- _self.propagateInternalScanMetricsToExternal();
- }
-
- private:
- DistributorStripe& _self;
- };
-
std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept {
return _db_memory_sample_interval;
}
@@ -335,7 +319,6 @@ private:
framework::ThreadWaitInfo _tickResult;
BucketDBMetricUpdater _bucketDBMetricUpdater;
std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher;
- MetricUpdateHook _metricUpdateHook;
mutable std::mutex _metricLock;
/**
* Maintenance stats for last completed database scan iteration.