diff options
31 files changed, 1590 insertions, 855 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 1b93e728a04..4ec49b5c6f8 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -13,6 +13,7 @@ #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/storage/distributor/simpleclusterinformation.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 564cd2bc876..61c74a263cf 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -13,6 +13,8 @@ #include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/config/config-stor-distributormanager.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_status.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/vespalib/text/stringtokenizer.h> @@ -65,8 +67,7 @@ struct DistributorTest : Test, DistributorTestUtil { } auto currentReplicaCountingMode() const noexcept { - return _distributor->_bucketDBMetricUpdater - .getMinimumReplicaCountingMode(); + return _distributor->bucket_db_metric_updater().getMinimumReplicaCountingMode(); } std::string testOp(std::shared_ptr<api::StorageMessage> msg) @@ -141,23 +142,25 @@ struct DistributorTest : Test, DistributorTestUtil { } StatusReporterDelegate& distributor_status_delegate() { - return _distributor->_distributorStatusDelegate; + // TODO STRIPE + return _distributor->_stripe->_distributorStatusDelegate; } framework::TickingThreadPool& distributor_thread_pool() { return _distributor->_threadPool; } - const std::vector<std::shared_ptr<Distributor::Status>>& distributor_status_todos() { - return _distributor->_statusToDo; + const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() { + // TODO STRIPE + return _distributor->_stripe->_statusToDo; } Distributor::MetricUpdateHook distributor_metric_update_hook() { return _distributor->_metricUpdateHook; } - SimpleMaintenanceScanner::PendingMaintenanceStats& distributor_maintenance_stats() { - return _distributor->_maintenanceStats; + SimpleMaintenanceScanner::PendingMaintenanceStats distributor_maintenance_stats() { + return _distributor->pending_maintenance_stats(); } BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() { diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 2802b976256..b465edf5d16 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -4,6 +4,7 @@ #include <vespa/document/test/make_bucket_space.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributorcomponent.h> #include <vespa/vdslib/distribution/distribution.h> @@ -257,6 +258,7 @@ DistributorTestUtil::removeFromBucketDB(const document::BucketId& id) void DistributorTestUtil::addIdealNodes(const document::BucketId& id) { + // TODO STRIPE roundabout way of getting state bundle..! addIdealNodes(*distributor_component().getClusterStateBundle().getBaselineClusterState(), id); } @@ -338,20 +340,21 @@ DistributorTestUtil::disableBucketActivationInConfig(bool disable) BucketDBUpdater& DistributorTestUtil::getBucketDBUpdater() { - return _distributor->_bucketDBUpdater; + return _distributor->bucket_db_updater(); } IdealStateManager& DistributorTestUtil::getIdealStateManager() { - return _distributor->_idealStateManager; + return _distributor->ideal_state_manager(); } ExternalOperationHandler& DistributorTestUtil::getExternalOperationHandler() { - return _distributor->_externalOperationHandler; + return _distributor->external_operation_handler(); } storage::distributor::DistributorComponent& DistributorTestUtil::distributor_component() { - return _distributor->_component; + // TODO STRIPE tests use this to indirectly access bucket space repos/DBs! + return _distributor->distributor_component(); } bool @@ -369,6 +372,7 @@ DistributorTestUtil::tick() { DistributorConfiguration& DistributorTestUtil::getConfig() { + // TODO STRIPE avoid const cast return const_cast<DistributorConfiguration&>(_distributor->getConfig()); } diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 630d466a72e..f450f2545db 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -22,10 +22,12 @@ class Distributor; class DistributorBucketSpace; class DistributorBucketSpaceRepo; class DistributorComponent; +class DistributorStripe; class IdealStateManager; class ExternalOperationHandler; class Operation; +// TODO STRIPE rename to DistributorStripeTestUtil? class DistributorTestUtil : private DoneInitializeHandler { public: diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index a95418b0b74..1829808990a 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -8,6 +8,7 @@ #include <vespa/document/update/documentupdate.h> #include <vespa/storage/common/reindexing_constants.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/externaloperationhandler.h> diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index fe87de5f18a..1123c354ef4 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -8,6 +8,7 @@ #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/operations/external/getoperation.h> #include <tests/distributor/distributortestutil.h> diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index cae6bf9f226..fd23dd5d656 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -3,6 +3,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> #include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/storageapi/message/stat.h> diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index bdb5eb9eb4d..c510e08ab2a 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -3,6 +3,7 @@ #include <vespa/document/repo/documenttyperepo.h> #include <vespa/storage/distributor/operations/external/putoperation.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 91fb560f381..02491b670c6 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -8,6 +8,7 @@ #include <vespa/storage/distributor/operations/external/read_for_write_visitor_operation.h> #include <vespa/storage/distributor/operations/external/visitoroperation.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/distributor/uuid_generator.h> diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp index 779d6f60be0..7ba2995d8e3 100644 --- a/storage/src/tests/distributor/removelocationtest.cpp +++ b/storage/src/tests/distributor/removelocationtest.cpp @@ -5,6 +5,7 @@ #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/vespalib/gtest/gtest.h> using document::test::makeDocumentBucket; diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index cc5452e2bfe..de76379e854 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -3,6 +3,7 @@ #include <iomanip> #include <tests/common/dummystoragelink.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storageapi/message/persistence.h> #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 314e5b27e25..2f4c386e1ed 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -7,6 +7,7 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> #include <vespa/storage/distributor/statecheckers.h> diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 924678a6cd0..58556832f2d 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -10,6 +10,7 @@ #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/gtest/gtest.h> #include <gmock/gmock.h> diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index d3a8b270ad0..ea9f0d86ac4 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -10,6 +10,7 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/operations/external/updateoperation.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/vespalib/gtest/gtest.h> diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index c4f72c312c7..ccbb64e8970 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -10,6 +10,7 @@ #include <vespa/storage/distributor/distributormetricsset.h> #include <tests/distributor/distributortestutil.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/vespalib/gtest/gtest.h> diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index f3ba6af6e0c..2b5423c60e4 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -14,6 +14,8 @@ vespa_add_library(storage_distributor distributor_bucket_space_repo.cpp distributor.cpp distributor_host_info_reporter.cpp + distributor_status.cpp + distributor_stripe.cpp distributorcomponent.cpp distributormessagesender.cpp distributormetricsset.cpp diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 12fd14c260e..2a76ed5a26a 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -24,7 +24,7 @@ using document::BucketSpace; namespace storage::distributor { -BucketDBUpdater::BucketDBUpdater(Distributor& owner, +BucketDBUpdater::BucketDBUpdater(DistributorInterface& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorMessageSender& sender, diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 8fab76575e9..d80d823a7d1 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -26,7 +26,7 @@ class XmlAttribute; namespace storage::distributor { -class Distributor; +class DistributorInterface; class BucketSpaceDistributionContext; class BucketDBUpdater : public framework::StatusReporter, @@ -34,7 +34,7 @@ class BucketDBUpdater : public framework::StatusReporter, { public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; - BucketDBUpdater(Distributor& owner, + BucketDBUpdater(DistributorInterface& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorMessageSender& sender, diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index cb4336d9f1e..665478e68a2 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -3,6 +3,8 @@ #include "blockingoperationstarter.h" #include "distributor.h" #include "distributor_bucket_space.h" +#include "distributor_status.h" +#include "distributor_stripe.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" #include "operation_sequencer.h" @@ -26,45 +28,16 @@ using namespace std::chrono_literals; namespace storage::distributor { -class Distributor::Status { - const DelegatedStatusRequest& _request; - std::mutex _lock; - std::condition_variable _cond; - bool _done; - -public: - Status(const DelegatedStatusRequest& request) noexcept - : _request(request), - _lock(), - _cond(), - _done(false) - {} - - std::ostream& getStream() { - return _request.outputStream; - } - const framework::HttpUrlPath& getPath() const { - return _request.path; - } - const framework::StatusReporter& getReporter() const { - return _request.reporter; - } - - void notifyCompleted() { - { - std::lock_guard guard(_lock); - _done = true; - } - _cond.notify_all(); - } - void waitForCompletion() { - std::unique_lock guard(_lock); - while (!_done) { - _cond.wait(guard); - } - } -}; - +/* TODO STRIPE + * - need a DistributorComponent per stripe + * - or better, remove entirely! + * - probably also DistributorInterface since it's used to send + * - metrics aggregation + * - host info aggregation..!! + * - handled if Distributor getMinReplica etc delegates to stripes? + * - these are already thread safe + * - status aggregation + */ Distributor::Distributor(DistributorComponentRegister& compReg, const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, @@ -75,56 +48,27 @@ Distributor::Distributor(DistributorComponentRegister& compReg, : StorageLink("distributor"), DistributorInterface(), framework::StatusReporter("distributor", "Distributor"), - _clusterStateBundle(lib::ClusterState()), + _metrics(std::make_shared<DistributorMetricSet>()), + _messageSender(messageSender), + _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler, + manageActiveBucketCopies, *this)), + // TODO STRIPE remove once DistributorComponent no longer references bucket space repos _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())), _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())), + // TODO STRIPE slim down _component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"), - _metrics(std::make_shared<DistributorMetricSet>()), - _operationOwner(*this, _component.getClock()), - _maintenanceOperationOwner(*this, _component.getClock()), - _operation_sequencer(std::make_unique<OperationSequencer>()), - _pendingMessageTracker(compReg), - _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg), _distributorStatusDelegate(compReg, *this, *this), - _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), - _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), - _messageSender(messageSender), - _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(), - *_operation_sequencer, *this, _component, - _idealStateManager, _operationOwner), _threadPool(threadPool), - _initializingIsUp(true), - _doneInitializeHandler(doneInitHandler), - _doneInitializing(false), - _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()), - _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), - _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), - _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer, - *_throttlingStarter)), - _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)), - _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE), - _recoveryTimeStarted(_component.getClock()), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), - _bucketIdHasher(std::make_unique<BucketGcTimeCalculator::BucketIdIdentityHasher>()), _metricUpdateHook(*this), _metricLock(), - _maintenanceStats(), - _bucketSpacesStats(), - _bucketDbStats(), - _hostInfoReporter(*this, *this), - _ownershipSafeTimeCalc(std::make_unique<OwnershipTransferSafeTimePointCalculator>(0s)), // Set by config later - _db_memory_sample_interval(30s), - _last_db_memory_sample_time_point(), - _inhibited_maintenance_tick_count(0), - _must_send_updated_host_info(false) + _hostInfoReporter(*this, *this) { _component.registerMetric(*_metrics); _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); _distributorStatusDelegate.registerStatusPage(); - _bucketDBStatusDelegate.registerStatusPage(); hostInfoReporterRegistrar.registerReporter(&_hostInfoReporter); propagateDefaultDistribution(_component.getDistribution()); - propagateClusterStates(); }; Distributor::~Distributor() @@ -133,37 +77,117 @@ Distributor::~Distributor() closeNextLink(); } +bool +Distributor::isInRecoveryMode() const noexcept { + return _stripe->isInRecoveryMode(); +} + int -Distributor::getDistributorIndex() const -{ +Distributor::getDistributorIndex() const { return _component.getIndex(); } const PendingMessageTracker& -Distributor::getPendingMessageTracker() const -{ - return _pendingMessageTracker; +Distributor::getPendingMessageTracker() const { + return _stripe->getPendingMessageTracker(); +} + +PendingMessageTracker& +Distributor::getPendingMessageTracker() { + return _stripe->getPendingMessageTracker(); +} + +DistributorBucketSpaceRepo& +Distributor::getBucketSpaceRepo() noexcept { + return _stripe->getBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo& +Distributor::getBucketSpaceRepo() const noexcept { + return _stripe->getBucketSpaceRepo(); +} + +DistributorBucketSpaceRepo& +Distributor::getReadOnlyBucketSpaceRepo() noexcept { + return _stripe->getReadOnlyBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo& +Distributor::getReadyOnlyBucketSpaceRepo() const noexcept { + return _stripe->getReadOnlyBucketSpaceRepo();; +} + +storage::distributor::DistributorComponent& +Distributor::distributor_component() noexcept { + // TODO STRIPE We need to grab the stripe's component since tests like to access + // these things uncomfortably directly. + return _stripe->_component; +} + +BucketDBUpdater& +Distributor::bucket_db_updater() { + return _stripe->bucket_db_updater(); +} + +const BucketDBUpdater& +Distributor::bucket_db_updater() const { + return _stripe->bucket_db_updater(); +} + +IdealStateManager& +Distributor::ideal_state_manager() { + return _stripe->ideal_state_manager(); +} + +const IdealStateManager& +Distributor::ideal_state_manager() const { + return _stripe->ideal_state_manager(); +} + +ExternalOperationHandler& +Distributor::external_operation_handler() { + return _stripe->external_operation_handler(); +} + +const ExternalOperationHandler& +Distributor::external_operation_handler() const { + return _stripe->external_operation_handler(); +} + +BucketDBMetricUpdater& +Distributor::bucket_db_metric_updater() const noexcept { + return _stripe->_bucketDBMetricUpdater; +} + +const DistributorConfiguration& +Distributor::getConfig() const { + return _stripe->getConfig(); +} + +std::chrono::steady_clock::duration +Distributor::db_memory_sample_interval() const noexcept { + return _stripe->db_memory_sample_interval(); +} + +bool Distributor::initializing() const { + return _stripe->initializing(); } const lib::ClusterState* Distributor::pendingClusterStateOrNull(const document::BucketSpace& space) const { - return _bucketDBUpdater.pendingClusterStateOrNull(space); + return bucket_db_updater().pendingClusterStateOrNull(space); } void -Distributor::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) +Distributor::sendCommand(const std::shared_ptr<api::StorageCommand>&) { - if (cmd->getType() == api::MessageType::MERGEBUCKET) { - api::MergeBucketCommand& merge(static_cast<api::MergeBucketCommand&>(*cmd)); - _idealStateManager.getMetrics().nodesPerMerge.addValue(merge.getNodes().size()); - } - sendUp(cmd); + assert(false); // TODO STRIPE } void -Distributor::sendReply(const std::shared_ptr<api::StorageReply>& reply) +Distributor::sendReply(const std::shared_ptr<api::StorageReply>&) { - sendUp(reply); + assert(false); // TODO STRIPE } void @@ -179,6 +203,7 @@ void Distributor::onOpen() { LOG(debug, "Distributor::onOpen invoked"); + _stripe->open(); setNodeStateUp(); framework::MilliSecTime maxProcessingTime(60 * 1000); framework::MilliSecTime waitTime(1000); @@ -200,38 +225,22 @@ void Distributor::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMe } void Distributor::onClose() { - for (auto& msg : _messageQueue) { - if (!msg->getType().isReply()) { - send_shutdown_abort_reply(msg); - } - } - _messageQueue.clear(); - while (!_client_request_priority_queue.empty()) { - send_shutdown_abort_reply(_client_request_priority_queue.top()); - _client_request_priority_queue.pop(); - } - LOG(debug, "Distributor::onClose invoked"); - _pendingMessageTracker.abort_deferred_tasks(); - _bucketDBUpdater.flush(); - _externalOperationHandler.close_pending(); - _operationOwner.onClose(); - _maintenanceOperationOwner.onClose(); + _stripe->close(); } -void Distributor::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>& msg) { - if (_messageSender) { - _messageSender->sendUp(msg); - } else { - StorageLink::sendUp(msg); - } +void Distributor::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) { + assert(false); } void Distributor::sendUp(const std::shared_ptr<api::StorageMessage>& msg) { - _pendingMessageTracker.insert(msg); - send_up_without_tracking(msg); + if (_messageSender) { + _messageSender->sendUp(msg); + } else { + StorageLink::sendUp(msg); + } } void @@ -247,281 +256,72 @@ Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg) bool Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { - if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) { - return true; - } - framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); - MBUS_TRACE(msg->getTrace(), 9, - "Distributor: Added to message queue. Thread state: " - + _threadPool.getStatus()); - _messageQueue.push_back(msg); - guard.broadcast(); - return true; + return _stripe->onDown(msg); } void Distributor::handleCompletedMerge( - const std::shared_ptr<api::MergeBucketReply>& reply) + const std::shared_ptr<api::MergeBucketReply>&) { - _maintenanceOperationOwner.handleReply(reply); -} - -bool -Distributor::isMaintenanceReply(const api::StorageReply& reply) const -{ - switch (reply.getType().getId()) { - case api::MessageType::CREATEBUCKET_REPLY_ID: - case api::MessageType::MERGEBUCKET_REPLY_ID: - case api::MessageType::DELETEBUCKET_REPLY_ID: - case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: - case api::MessageType::SPLITBUCKET_REPLY_ID: - case api::MessageType::JOINBUCKETS_REPLY_ID: - case api::MessageType::SETBUCKETSTATE_REPLY_ID: - case api::MessageType::REMOVELOCATION_REPLY_ID: - return true; - default: - return false; - } + assert(false); } bool -Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) +Distributor::isMaintenanceReply(const api::StorageReply&) const { - document::Bucket bucket = _pendingMessageTracker.reply(*reply); - - if (reply->getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND && - bucket.getBucketId() != document::BucketId(0) && - reply->getAddress()) - { - recheckBucketInfo(reply->getAddress()->getIndex(), bucket); - } - - if (reply->callHandler(_bucketDBUpdater, reply)) { - return true; - } - - if (_operationOwner.handleReply(reply)) { - return true; - } - - if (_maintenanceOperationOwner.handleReply(reply)) { - _scanner->prioritizeBucket(bucket); - return true; - } - - // If it's a maintenance operation reply, it's most likely a reply to an - // operation whose state was flushed from the distributor when its node - // went down in the cluster state. Just swallow the reply to avoid getting - // warnings about unhandled messages at the bottom of the link chain. - return isMaintenanceReply(*reply); + assert(false); } bool -Distributor::generateOperation( - const std::shared_ptr<api::StorageMessage>& msg, - Operation::SP& operation) +Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) { - return _externalOperationHandler.handleMessage(msg, operation); + return _stripe->handleReply(reply); } bool Distributor::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) { - if (msg->getType().isReply()) { - auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg); - if (handleReply(reply)) { - return true; - } - } - - if (msg->callHandler(_bucketDBUpdater, msg)) { - return true; - } - - Operation::SP operation; - if (generateOperation(msg, operation)) { - if (operation.get()) { - _operationOwner.start(operation, msg->getPriority()); - } - return true; - } - - return false; + return _stripe->handleMessage(msg); } const lib::ClusterStateBundle& Distributor::getClusterStateBundle() const { - return _clusterStateBundle; + // TODO STRIPE must offer a single unifying state across stripes + return _stripe->getClusterStateBundle(); } void Distributor::enableClusterStateBundle(const lib::ClusterStateBundle& state) { - lib::ClusterStateBundle oldState = _clusterStateBundle; - _clusterStateBundle = state; - propagateClusterStates(); - - lib::Node myNode(lib::NodeType::DISTRIBUTOR, _component.getIndex()); - const auto &baselineState = *_clusterStateBundle.getBaselineClusterState(); - - if (!_doneInitializing && - baselineState.getNodeState(myNode).getState() == lib::State::UP) - { - _doneInitializing = true; - _doneInitializeHandler.notifyDoneInitializing(); - } - enterRecoveryMode(); - - // Clear all active messages on nodes that are down. - const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); - const uint16_t new_node_count = baselineState.getNodeCount(lib::NodeType::STORAGE); - for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) { - const auto& node_state = baselineState.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState(); - if (!node_state.oneOf(getStorageNodeUpStates())) { - std::vector<uint64_t> msgIds = _pendingMessageTracker.clearMessagesForNode(i); - LOG(debug, "Node %u is down, clearing %zu pending maintenance operations", i, msgIds.size()); - - for (uint32_t j = 0; j < msgIds.size(); ++j) { - _maintenanceOperationOwner.erase(msgIds[j]); - } - } - } - - if (_bucketDBUpdater.bucketOwnershipHasChanged()) { - using TimePoint = OwnershipTransferSafeTimePointCalculator::TimePoint; - // Note: this assumes that std::chrono::system_clock and the framework - // system clock have the same epoch, which should be a reasonable - // assumption. - const auto now = TimePoint(std::chrono::milliseconds( - _component.getClock().getTimeInMillis().getTime())); - _externalOperationHandler.rejectFeedBeforeTimeReached( - _ownershipSafeTimeCalc->safeTimePoint(now)); - } + // TODO STRIPE make test injection/force-function + _stripe->enableClusterStateBundle(state); } -OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket& bucket) const { - return _bucketDBUpdater.read_snapshot_for_bucket(bucket); +OperationRoutingSnapshot Distributor::read_snapshot_for_bucket(const document::Bucket&) const { + abort(); } void Distributor::notifyDistributionChangeEnabled() { - LOG(debug, "Pending cluster state for distribution change has been enabled"); - // Trigger a re-scan of bucket database, just like we do when a new cluster - // state has been enabled. - enterRecoveryMode(); -} - -void -Distributor::enterRecoveryMode() -{ - LOG(debug, "Entering recovery mode"); - _schedulingMode = MaintenanceScheduler::RECOVERY_SCHEDULING_MODE; - _scanner->reset(); - _bucketDBMetricUpdater.reset(); - // TODO reset _bucketDbStats? - invalidate_bucket_spaces_stats(); - - _recoveryTimeStarted = framework::MilliSecTimer(_component.getClock()); -} - -void -Distributor::leaveRecoveryMode() -{ - if (isInRecoveryMode()) { - LOG(debug, "Leaving recovery mode"); - _metrics->recoveryModeTime.addValue( - _recoveryTimeStarted.getElapsedTimeAsDouble()); - if (_doneInitializing) { - _must_send_updated_host_info = true; - } - } - _schedulingMode = MaintenanceScheduler::NORMAL_SCHEDULING_MODE; -} - -template <typename NodeFunctor> -void Distributor::for_each_available_content_node_in(const lib::ClusterState& state, NodeFunctor&& func) { - const auto node_count = state.getNodeCount(lib::NodeType::STORAGE); - for (uint16_t i = 0; i < node_count; ++i) { - lib::Node node(lib::NodeType::STORAGE, i); - if (state.getNodeState(node).getState().oneOf("uir")) { - func(node); - } - } -} - -BucketSpacesStatsProvider::BucketSpacesStats Distributor::make_invalid_stats_per_configured_space() const { - BucketSpacesStatsProvider::BucketSpacesStats invalid_space_stats; - for (auto& space : *_bucketSpaceRepo) { - invalid_space_stats.emplace(document::FixedBucketSpaces::to_string(space.first), - BucketSpaceStats::make_invalid()); - } - return invalid_space_stats; -} - -void Distributor::invalidate_bucket_spaces_stats() { - std::lock_guard guard(_metricLock); - _bucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats(); - auto invalid_space_stats = make_invalid_stats_per_configured_space(); - - const auto& baseline = *_clusterStateBundle.getBaselineClusterState(); - for_each_available_content_node_in(baseline, [this, &invalid_space_stats](const lib::Node& node) { - _bucketSpacesStats[node.getIndex()] = invalid_space_stats; - }); + _stripe->notifyDistributionChangeEnabled(); } void Distributor::storageDistributionChanged() { - if (!_distribution.get() - || *_component.getDistribution() != *_distribution) - { - LOG(debug, - "Distribution changed to %s, must refetch bucket information", - _component.getDistribution()->toString().c_str()); - - // FIXME this is not thread safe - _nextDistribution = _component.getDistribution(); - } else { - LOG(debug, - "Got distribution change, but the distribution %s was the same as " - "before: %s", - _component.getDistribution()->toString().c_str(), - _distribution->toString().c_str()); - } + // May happen from any thread. + _stripe->storageDistributionChanged(); } void Distributor::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) { - _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket); + bucket_db_updater().recheckBucketInfo(nodeIdx, bucket); } namespace { -class MaintenanceChecker : public PendingMessageTracker::Checker -{ -public: - bool found; - - MaintenanceChecker() : found(false) {}; - - bool check(uint32_t msgType, uint16_t node, uint8_t pri) override { - (void) node; - (void) pri; - for (uint32_t i = 0; - IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[i] != 0; - ++i) - { - if (msgType == IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[i]) { - found = true; - return false; - } - } - return true; - } -}; - class SplitChecker : public PendingMessageTracker::Checker { public: @@ -545,64 +345,31 @@ public: } void -Distributor::checkBucketForSplit(document::BucketSpace bucketSpace, - const BucketDatabase::Entry& e, - uint8_t priority) +Distributor::checkBucketForSplit(document::BucketSpace, + const BucketDatabase::Entry&, + uint8_t) { - if (!getConfig().doInlineSplit()) { - return; - } - - // Verify that there are no existing pending splits at the - // appropriate priority. - SplitChecker checker(priority); - for (uint32_t i = 0; i < e->getNodeCount(); ++i) { - _pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(), - document::Bucket(bucketSpace, e.getBucketId()), - checker); - if (checker.found) { - return; - } - } - - Operation::SP operation = - _idealStateManager.generateInterceptingSplit(bucketSpace, e, priority); - - if (operation.get()) { - _maintenanceOperationOwner.start(operation, priority); - } + assert(false); } void Distributor::enableNextDistribution() { - if (_nextDistribution.get()) { - _distribution = _nextDistribution; - propagateDefaultDistribution(_distribution); - _nextDistribution = std::shared_ptr<lib::Distribution>(); - _bucketDBUpdater.storageDistributionChanged(); - } + _stripe->enableNextDistribution(); } +// TODO STRIPE only used by tests to directly inject new distribution config void Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { - auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); - for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { - repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); - repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); - } + _stripe->propagateDefaultDistribution(std::move(distribution)); } void Distributor::propagateClusterStates() { - for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { - for (auto& iter : *repo) { - iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first)); - } - } + assert(false); } void @@ -617,378 +384,99 @@ Distributor::workWasDone() const noexcept return !_tickResult.waitWanted(); } -namespace { - -bool is_client_request(const api::StorageMessage& msg) noexcept { - // Despite having been converted to StorageAPI messages, the following - // set of messages are never sent to the distributor by other processes - // than clients. - switch (msg.getType().getId()) { - case api::MessageType::GET_ID: - case api::MessageType::PUT_ID: - case api::MessageType::REMOVE_ID: - case api::MessageType::VISITOR_CREATE_ID: - case api::MessageType::VISITOR_DESTROY_ID: - case api::MessageType::GETBUCKETLIST_ID: - case api::MessageType::STATBUCKET_ID: - case api::MessageType::UPDATE_ID: - case api::MessageType::REMOVELOCATION_ID: - return true; - default: - return false; - } -} - -} - -void Distributor::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) { - if (!handleMessage(msg)) { - MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down."); - sendDown(msg); - } -} - -void Distributor::startExternalOperations() { - for (auto& msg : _fetchedMessages) { - if (is_client_request(*msg)) { - MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue"); - _client_request_priority_queue.emplace(std::move(msg)); - } else { - MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed."); - handle_or_propagate_message(msg); - } - } - - const bool start_single_client_request = !_client_request_priority_queue.empty(); - if (start_single_client_request) { - const auto& msg = _client_request_priority_queue.top(); - MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from " - "client request priority queue to be processed."); - handle_or_propagate_message(msg); - _client_request_priority_queue.pop(); - } - - if (!_fetchedMessages.empty() || start_single_client_request) { - signalWorkWasDone(); - } - _fetchedMessages.clear(); -} - std::unordered_map<uint16_t, uint32_t> Distributor::getMinReplica() const { - std::lock_guard guard(_metricLock); - return _bucketDbStats._minBucketReplica; + // TODO STRIPE merged snapshot from all stripes + return _stripe->getMinReplica(); } BucketSpacesStatsProvider::PerNodeBucketSpacesStats Distributor::getBucketSpacesStats() const { - std::lock_guard guard(_metricLock); - return _bucketSpacesStats; -} - -void -Distributor::propagateInternalScanMetricsToExternal() -{ - std::lock_guard guard(_metricLock); - - // All shared values are written when _metricLock is held, so no races. - if (_bucketDBMetricUpdater.hasCompletedRound()) { - _bucketDbStats.propagateMetrics(_idealStateManager.getMetrics(), - getMetrics()); - _idealStateManager.getMetrics().setPendingOperations( - _maintenanceStats.global.pending); - } -} - -namespace { - -BucketSpaceStats -toBucketSpaceStats(const NodeMaintenanceStats &stats) -{ - return BucketSpaceStats(stats.total, stats.syncing + stats.copyingIn); -} - -using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats; - -PerNodeBucketSpacesStats -toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats) -{ - PerNodeBucketSpacesStats result; - for (const auto &nodeEntry : maintenanceStats.perNodeStats()) { - for (const auto &bucketSpaceEntry : nodeEntry.second) { - auto bucketSpace = document::FixedBucketSpaces::to_string(bucketSpaceEntry.first); - result[nodeEntry.first][bucketSpace] = toBucketSpaceStats(bucketSpaceEntry.second); - } - } - return result; -} - -size_t spaces_with_merges_pending(const PerNodeBucketSpacesStats& stats) { - std::unordered_set<document::BucketSpace, document::BucketSpace::hash> spaces_with_pending; - for (auto& node : stats) { - for (auto& space : node.second) { - if (space.second.valid() && space.second.bucketsPending() != 0) { - // TODO avoid bucket space string roundtrip - spaces_with_pending.emplace(document::FixedBucketSpaces::from_string(space.first)); - } - } - } - return spaces_with_pending.size(); -} - -// TODO should we also trigger on !pending --> pending edge? -bool merge_no_longer_pending_edge(const PerNodeBucketSpacesStats& prev_stats, - const PerNodeBucketSpacesStats& curr_stats) { - const auto prev_pending = spaces_with_merges_pending(prev_stats); - const auto curr_pending = spaces_with_merges_pending(curr_stats); - return curr_pending < prev_pending; + // TODO STRIPE merged snapshot from all stripes + return _stripe->getBucketSpacesStats(); } +SimpleMaintenanceScanner::PendingMaintenanceStats +Distributor::pending_maintenance_stats() const { + // TODO STRIPE merged snapshot from all stripes + return _stripe->pending_maintenance_stats(); } void -Distributor::updateInternalMetricsForCompletedScan() +Distributor::propagateInternalScanMetricsToExternal() { - std::lock_guard guard(_metricLock); - - _bucketDBMetricUpdater.completeRound(); - _bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats(); - _maintenanceStats = _scanner->getPendingMaintenanceStats(); - auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats); - if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) { - _must_send_updated_host_info = true; - } - _bucketSpacesStats = std::move(new_space_stats); - maybe_update_bucket_db_memory_usage_stats(); + _stripe->propagateInternalScanMetricsToExternal(); } void Distributor::maybe_update_bucket_db_memory_usage_stats() { - auto now = _component.getClock().getMonotonicTime(); - if ((now - _last_db_memory_sample_time_point) > _db_memory_sample_interval) { - for (auto& space : *_bucketSpaceRepo) { - _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), true); - } - for (auto& space : *_readOnlyBucketSpaceRepo) { - _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), false); - } - _last_db_memory_sample_time_point = now; - } else { - // Reuse previous memory statistics instead of sampling new. - _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._mutable_db_mem_usage, true); - _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._read_only_db_mem_usage, false); - } + assert(false); } void Distributor::scanAllBuckets() { - enterRecoveryMode(); - while (!scanNextBucket().isDone()) {} -} - -MaintenanceScanner::ScanResult -Distributor::scanNextBucket() -{ - MaintenanceScanner::ScanResult scanResult(_scanner->scanNext()); - if (scanResult.isDone()) { - updateInternalMetricsForCompletedScan(); - leaveRecoveryMode(); - send_updated_host_info_if_required(); - _scanner->reset(); - } else { - const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution()); - _bucketDBMetricUpdater.visit( - scanResult.getEntry(), - distribution.getRedundancy()); - } - return scanResult; -} - -void Distributor::send_updated_host_info_if_required() { - if (_must_send_updated_host_info) { - _component.getStateUpdater().immediately_send_get_node_state_replies(); - _must_send_updated_host_info = false; - } -} - -void -Distributor::startNextMaintenanceOperation() -{ - _throttlingStarter->setMaxPendingRange(getConfig().getMinPendingMaintenanceOps(), - getConfig().getMaxPendingMaintenanceOps()); - _scheduler->tick(_schedulingMode); + _stripe->scanAllBuckets(); } framework::ThreadWaitInfo -Distributor::doCriticalTick(framework::ThreadIndex) +Distributor::doCriticalTick(framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - enableNextDistribution(); + // Propagates any new configs down to stripe(s) enableNextConfig(); - fetchStatusRequests(); - fetchExternalMessages(); + _stripe->doCriticalTick(idx); + _tickResult.merge(_stripe->_tickResult); return _tickResult; } framework::ThreadWaitInfo -Distributor::doNonCriticalTick(framework::ThreadIndex) +Distributor::doNonCriticalTick(framework::ThreadIndex idx) { - _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - handleStatusRequests(); - startExternalOperations(); - if (initializing()) { - _bucketDBUpdater.resendDelayedMessages(); - return _tickResult; - } - // Ordering note: since maintenance inhibiting checks whether startExternalOperations() - // did any useful work with incoming data, this check must be performed _after_ the call. - if (!should_inhibit_current_maintenance_scan_tick()) { - scanNextBucket(); - startNextMaintenanceOperation(); - if (isInRecoveryMode()) { - signalWorkWasDone(); - } - mark_maintenance_tick_as_no_longer_inhibited(); - _bucketDBUpdater.resendDelayedMessages(); - } else { - mark_current_maintenance_tick_as_inhibited(); - } + // TODO STRIPE stripes need their own thread loops! + _stripe->doNonCriticalTick(idx); + _tickResult = _stripe->_tickResult; return _tickResult; } -bool Distributor::should_inhibit_current_maintenance_scan_tick() const noexcept { - return (workWasDone() && (_inhibited_maintenance_tick_count - < getConfig().max_consecutively_inhibited_maintenance_ticks())); -} - -void Distributor::mark_current_maintenance_tick_as_inhibited() noexcept { - ++_inhibited_maintenance_tick_count; -} - -void Distributor::mark_maintenance_tick_as_no_longer_inhibited() noexcept { - _inhibited_maintenance_tick_count = 0; -} - void Distributor::enableNextConfig() { _hostInfoReporter.enableReporting(getConfig().getEnableHostInfoReporting()); - _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode()); - _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew()); - _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration()); - _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions()); - _externalOperationHandler.set_concurrent_gets_enabled( - getConfig().allowStaleReadsDuringClusterStateTransitions()); - _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets( - getConfig().use_weak_internal_read_consistency_for_client_gets()); -} - -void -Distributor::fetchStatusRequests() -{ - if (_fetchedStatusRequests.empty()) { - _fetchedStatusRequests.swap(_statusToDo); - } -} - -void -Distributor::fetchExternalMessages() -{ - assert(_fetchedMessages.empty()); - _fetchedMessages.swap(_messageQueue); + _stripe->enableNextConfig(); // TODO STRIPE avoid redundant call } void Distributor::handleStatusRequests() { - uint32_t sz = _fetchedStatusRequests.size(); - for (uint32_t i = 0; i < sz; ++i) { - Status& s(*_fetchedStatusRequests[i]); - s.getReporter().reportStatus(s.getStream(), s.getPath()); - s.notifyCompleted(); - } - _fetchedStatusRequests.clear(); - if (sz > 0) { - signalWorkWasDone(); - } + assert(false); } vespalib::string Distributor::getReportContentType(const framework::HttpUrlPath& path) const { - if (path.hasAttribute("page")) { - if (path.getAttribute("page") == "buckets") { - return "text/html"; - } else { - return "application/xml"; - } - } else { - return "text/html"; - } + return _stripe->getReportContentType(path); } std::string Distributor::getActiveIdealStateOperations() const { - return _maintenanceOperationOwner.toString(); -} - -std::string -Distributor::getActiveOperations() const -{ - return _operationOwner.toString(); + return _stripe->getActiveIdealStateOperations(); } bool Distributor::reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const { - if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") { - framework::PartlyHtmlStatusReporter htmlReporter(*this); - htmlReporter.reportHtmlHeader(out, path); - if (!path.hasAttribute("page")) { - out << "<a href=\"?page=pending\">Count of pending messages to " - << "storage nodes</a><br><a href=\"?page=maintenance&show=50\">" - << "List maintenance queue (adjust show parameter to see more " - << "operations, -1 for all)</a><br>\n<a href=\"?page=buckets\">" - << "List all buckets, highlight non-ideal state</a><br>\n"; - } else { - const_cast<IdealStateManager&>(_idealStateManager) - .getBucketStatus(out); - } - htmlReporter.reportHtmlFooter(out, path); - } else { - framework::PartlyXmlStatusReporter xmlReporter(*this, out, path); - using namespace vespalib::xml; - std::string page(path.getAttribute("page")); - - if (page == "pending") { - xmlReporter << XmlTag("pending") - << XmlAttribute("externalload", _operationOwner.size()) - << XmlAttribute("maintenance", - _maintenanceOperationOwner.size()) - << XmlEndTag(); - } else if (page == "maintenance") { - // Need new page - } - } - - return true; + return _stripe->reportStatus(out, path); } bool Distributor::handleStatusRequest(const DelegatedStatusRequest& request) const { - auto wrappedRequest = std::make_shared<Status>(request); - { - framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); - _statusToDo.push_back(wrappedRequest); - guard.broadcast(); - } - wrappedRequest->waitForCompletion(); - return true; + // TODO STRIPE need to aggregate status responses _across_ stripes..! + return _stripe->handleStatusRequest(request); } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 05340d7dcd2..c758dbd75e2 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -34,6 +34,8 @@ namespace storage::distributor { class BlockingOperationStarter; class BucketPriorityDatabase; class DistributorBucketSpaceRepo; +class DistributorStatus; +class DistributorStripe; class OperationSequencer; class OwnershipTransferSafeTimePointCalculator; class SimpleMaintenanceScanner; @@ -71,17 +73,13 @@ public: void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override; ChainedMessageSender& getMessageSender() override { - return (_messageSender == 0 ? *this : *_messageSender); + abort(); // TODO STRIPE } DistributorMetricSet& getMetrics() override { return *_metrics; } - PendingMessageTracker& getPendingMessageTracker() override { - return _pendingMessageTracker; - } - const OperationSequencer& operation_sequencer() const noexcept override { - return *_operation_sequencer; + abort(); // TODO STRIPE } const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override; @@ -112,10 +110,7 @@ public: bool handleStatusRequest(const DelegatedStatusRequest& request) const override; - uint32_t pendingMaintenanceCount() const; - std::string getActiveIdealStateOperations() const; - std::string getActiveOperations() const; virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; @@ -135,7 +130,7 @@ public: * storage nodes to be up. */ const char* getStorageNodeUpStates() const override { - return _initializingIsUp ? "uri" : "ur"; + return "uri"; } /** @@ -145,42 +140,33 @@ public: */ void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override; - - bool initializing() const override { - return !_doneInitializing; - } + bool initializing() const override; - const DistributorConfiguration& getConfig() const override { - return _component.getTotalDistributorConfig(); - } + const DistributorConfiguration& getConfig() const override; - bool isInRecoveryMode() const { - return _schedulingMode == MaintenanceScheduler::RECOVERY_SCHEDULING_MODE; - } + bool isInRecoveryMode() const noexcept; int getDistributorIndex() const override; + PendingMessageTracker& getPendingMessageTracker() override; const PendingMessageTracker& getPendingMessageTracker() const override; + + DistributorBucketSpaceRepo& getBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getBucketSpaceRepo() const noexcept; + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept; + const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept; + + storage::distributor::DistributorComponent& distributor_component() noexcept; + void sendCommand(const std::shared_ptr<api::StorageCommand>&) override; void sendReply(const std::shared_ptr<api::StorageReply>&) override; const BucketGcTimeCalculator::BucketIdHasher& getBucketIdHasher() const override { - return *_bucketIdHasher; - } - - DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; } - const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; } - - DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept { - return *_readOnlyBucketSpaceRepo; - } - const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept { - return *_readOnlyBucketSpaceRepo; + abort(); // TODO STRIPE } OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override; - class Status; class MetricUpdateHook : public framework::MetricUpdateHook { public: @@ -197,9 +183,7 @@ public: Distributor& _self; }; - std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept { - return _db_memory_sample_interval; - } + std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; private: friend struct DistributorTest; @@ -216,59 +200,43 @@ private: void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg); void startExternalOperations(); + // Accessors used by tests + BucketDBUpdater& bucket_db_updater(); + const BucketDBUpdater& bucket_db_updater() const; + IdealStateManager& ideal_state_manager(); + const IdealStateManager& ideal_state_manager() const; + ExternalOperationHandler& external_operation_handler(); + const ExternalOperationHandler& external_operation_handler() const; + + BucketDBMetricUpdater& bucket_db_metric_updater() const noexcept; + /** * Return a copy of the latest min replica data, see MinReplicaProvider. */ std::unordered_map<uint16_t, uint32_t> getMinReplica() const override; PerNodeBucketSpacesStats getBucketSpacesStats() const override; + SimpleMaintenanceScanner::PendingMaintenanceStats pending_maintenance_stats() const; /** * Atomically publish internal metrics to external ideal state metrics. * Takes metric lock. */ void propagateInternalScanMetricsToExternal(); - /** - * Atomically updates internal metrics (not externally visible metrics; - * these are not changed until a snapshot triggers - * propagateIdealStateMetrics()). - * - * Takes metric lock. - */ - void updateInternalMetricsForCompletedScan(); void maybe_update_bucket_db_memory_usage_stats(); void scanAllBuckets(); - MaintenanceScanner::ScanResult scanNextBucket(); - bool should_inhibit_current_maintenance_scan_tick() const noexcept; - void mark_current_maintenance_tick_as_inhibited() noexcept; - void mark_maintenance_tick_as_no_longer_inhibited() noexcept; void enableNextConfig(); - void fetchStatusRequests(); - void fetchExternalMessages(); - void startNextMaintenanceOperation(); void signalWorkWasDone(); bool workWasDone() const noexcept; - void enterRecoveryMode(); - void leaveRecoveryMode(); - - // Tries to generate an operation from the given message. Returns true - // if we either returned an operation, or the message was otherwise handled - // (for instance, wrong distribution). - bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg, - Operation::SP& operation); - void enableNextDistribution(); void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); void propagateClusterStates(); - BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const; - template <typename NodeFunctor> - void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&); - void invalidate_bucket_spaces_stats(); - void send_updated_host_info_if_required(); - - lib::ClusterStateBundle _clusterStateBundle; + std::shared_ptr<DistributorMetricSet> _metrics; + ChainedMessageSender* _messageSender; + // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. + std::unique_ptr<DistributorStripe> _stripe; std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo; // Read-only bucket space repo with DBs that only contain buckets transiently @@ -276,74 +244,18 @@ private: // and the DBs are empty during non-transition phases. std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo; storage::distributor::DistributorComponent _component; - std::shared_ptr<DistributorMetricSet> _metrics; - - OperationOwner _operationOwner; - OperationOwner _maintenanceOperationOwner; - std::unique_ptr<OperationSequencer> _operation_sequencer; - PendingMessageTracker _pendingMessageTracker; - BucketDBUpdater _bucketDBUpdater; StatusReporterDelegate _distributorStatusDelegate; - StatusReporterDelegate _bucketDBStatusDelegate; - IdealStateManager _idealStateManager; - ChainedMessageSender* _messageSender; - ExternalOperationHandler _externalOperationHandler; - - std::shared_ptr<lib::Distribution> _distribution; - std::shared_ptr<lib::Distribution> _nextDistribution; - using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>; - struct IndirectHigherPriority { - template <typename Lhs, typename Rhs> - bool operator()(const Lhs& lhs, const Rhs& rhs) const noexcept { - return lhs->getPriority() > rhs->getPriority(); - } - }; - using ClientRequestPriorityQueue = std::priority_queue< - std::shared_ptr<api::StorageMessage>, - std::vector<std::shared_ptr<api::StorageMessage>>, - IndirectHigherPriority - >; - MessageQueue _messageQueue; - ClientRequestPriorityQueue _client_request_priority_queue; - MessageQueue _fetchedMessages; framework::TickingThreadPool& _threadPool; - mutable std::vector<std::shared_ptr<Status>> _statusToDo; - mutable std::vector<std::shared_ptr<Status>> _fetchedStatusRequests; + mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo; + mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests; - bool _initializingIsUp; - - DoneInitializeHandler& _doneInitializeHandler; - bool _doneInitializing; - - std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb; - std::unique_ptr<SimpleMaintenanceScanner> _scanner; - std::unique_ptr<ThrottlingOperationStarter> _throttlingStarter; - std::unique_ptr<BlockingOperationStarter> _blockingStarter; - std::unique_ptr<MaintenanceScheduler> _scheduler; - MaintenanceScheduler::SchedulingMode _schedulingMode; - framework::MilliSecTimer _recoveryTimeStarted; framework::ThreadWaitInfo _tickResult; - BucketDBMetricUpdater _bucketDBMetricUpdater; - std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher; MetricUpdateHook _metricUpdateHook; mutable std::mutex _metricLock; - /** - * Maintenance stats for last completed database scan iteration. - * Access must be protected by _metricLock as it is read by metric - * manager thread but written by distributor thread. - */ - SimpleMaintenanceScanner::PendingMaintenanceStats _maintenanceStats; - BucketSpacesStatsProvider::PerNodeBucketSpacesStats _bucketSpacesStats; - BucketDBMetricUpdater::Stats _bucketDbStats; DistributorHostInfoReporter _hostInfoReporter; - std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc; - std::chrono::steady_clock::duration _db_memory_sample_interval; - std::chrono::steady_clock::time_point _last_db_memory_sample_time_point; - size_t _inhibited_maintenance_tick_count; - bool _must_send_updated_host_info; }; } diff --git a/storage/src/vespa/storage/distributor/distributor_status.cpp b/storage/src/vespa/storage/distributor/distributor_status.cpp new file mode 100644 index 00000000000..811608822d8 --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_status.cpp @@ -0,0 +1,32 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "distributor_status.h" +#include "delegatedstatusrequest.h" + +namespace storage::distributor { + +std::ostream& DistributorStatus::getStream() { + return _request.outputStream; +} +const framework::HttpUrlPath& DistributorStatus::getPath() const { + return _request.path; +} +const framework::StatusReporter& DistributorStatus::getReporter() const { + return _request.reporter; +} + +void DistributorStatus::notifyCompleted() { + { + std::lock_guard guard(_lock); + _done = true; + } + _cond.notify_all(); +} +void DistributorStatus::waitForCompletion() { + std::unique_lock guard(_lock); + while (!_done) { + _cond.wait(guard); + } +} + +} diff --git a/storage/src/vespa/storage/distributor/distributor_status.h b/storage/src/vespa/storage/distributor/distributor_status.h new file mode 100644 index 00000000000..6783789949b --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_status.h @@ -0,0 +1,41 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <condition_variable> +#include <iosfwd> +#include <mutex> + +namespace storage::framework { +class HttpUrlPath; +class StatusReporter; +} + +namespace storage::distributor { + +class DelegatedStatusRequest; + +// TODO STRIPE description +class DistributorStatus { + const DelegatedStatusRequest& _request; + std::mutex _lock; + std::condition_variable _cond; + bool _done; + +public: + DistributorStatus(const DelegatedStatusRequest& request) noexcept + : _request(request), + _lock(), + _cond(), + _done(false) + {} + + std::ostream& getStream(); + const framework::HttpUrlPath& getPath() const; + const framework::StatusReporter& getReporter() const; + + void notifyCompleted(); + void waitForCompletion(); +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp new file mode 100644 index 00000000000..4671e5ec9ca --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -0,0 +1,903 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "blockingoperationstarter.h" +#include "distributor_stripe.h" +#include "distributor_status.h" +#include "distributor_bucket_space.h" +#include "distributormetricsset.h" +#include "idealstatemetricsset.h" +#include "operation_sequencer.h" +#include "ownership_transfer_safe_time_point_calculator.h" +#include "throttlingoperationstarter.h" +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/storage/common/global_bucket_space_distribution_converter.h> +#include <vespa/storage/common/hostreporter/hostinfo.h> +#include <vespa/storage/common/node_identity.h> +#include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h> +#include <vespa/storageframework/generic/status/xmlstatusreporter.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vespalib/util/memoryusage.h> +#include <algorithm> + +#include <vespa/log/log.h> +LOG_SETUP(".distributor_stripe"); + +using namespace std::chrono_literals; + +namespace storage::distributor { + +/* TODO STRIPE + * - need a DistributorComponent per stripe + * - or better, remove entirely! + * - probably also DistributorInterface since it's used to send + * - metrics aggregation + */ +DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, + DistributorMetricSet& metrics, + const NodeIdentity& node_identity, + framework::TickingThreadPool& threadPool, + DoneInitializeHandler& doneInitHandler, + bool manageActiveBucketCopies, + ChainedMessageSender& messageSender) + : StorageLink("distributor"), + DistributorInterface(), + framework::StatusReporter("distributor", "Distributor"), + _clusterStateBundle(lib::ClusterState()), + _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())), + _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(node_identity.node_index())), + _component(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, "distributor"), + _metrics(metrics), + _operationOwner(*this, _component.getClock()), + _maintenanceOperationOwner(*this, _component.getClock()), + _operation_sequencer(std::make_unique<OperationSequencer>()), + _pendingMessageTracker(compReg), + _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg), + _distributorStatusDelegate(compReg, *this, *this), + _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), + _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), + _messageSender(messageSender), + _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(), + *_operation_sequencer, *this, _component, + _idealStateManager, _operationOwner), + _threadPool(threadPool), + _doneInitializeHandler(doneInitHandler), + _doneInitializing(false), + _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()), + _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), + _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), + _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer, + *_throttlingStarter)), + _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)), + _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE), + _recoveryTimeStarted(_component.getClock()), + _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), + _bucketIdHasher(std::make_unique<BucketGcTimeCalculator::BucketIdIdentityHasher>()), + _metricUpdateHook(*this), + _metricLock(), + _maintenanceStats(), + _bucketSpacesStats(), + _bucketDbStats(), + _ownershipSafeTimeCalc(std::make_unique<OwnershipTransferSafeTimePointCalculator>(0s)), // Set by config later + _db_memory_sample_interval(30s), + _last_db_memory_sample_time_point(), + _inhibited_maintenance_tick_count(0), + _must_send_updated_host_info(false) +{ + _bucketDBStatusDelegate.registerStatusPage(); + propagateDefaultDistribution(_component.getDistribution()); + propagateClusterStates(); +}; + +DistributorStripe::~DistributorStripe() = default; + +int +DistributorStripe::getDistributorIndex() const +{ + return _component.getIndex(); +} + +const PendingMessageTracker& +DistributorStripe::getPendingMessageTracker() const +{ + return _pendingMessageTracker; +} + +const lib::ClusterState* +DistributorStripe::pendingClusterStateOrNull(const document::BucketSpace& space) const { + return _bucketDBUpdater.pendingClusterStateOrNull(space); +} + +void +DistributorStripe::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) +{ + if (cmd->getType() == api::MessageType::MERGEBUCKET) { + api::MergeBucketCommand& merge(static_cast<api::MergeBucketCommand&>(*cmd)); + _idealStateManager.getMetrics().nodesPerMerge.addValue(merge.getNodes().size()); + } + sendUp(cmd); +} + +void +DistributorStripe::sendReply(const std::shared_ptr<api::StorageReply>& reply) +{ + sendUp(reply); +} + +void +DistributorStripe::onOpen() +{ + LOG(debug, "DistributorStripe::onOpen invoked"); + if (_component.getDistributorConfig().startDistributorThread) { + // TODO STRIPE own thread per stripe! + } else { + LOG(warning, "Not starting distributor stripe thread as it's not configured to " + "run. Unless you are just running a test tool, this is a " + "fatal error."); + } +} + +void DistributorStripe::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) { + api::StorageReply::UP reply( + std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down")); + sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); +} + +void DistributorStripe::onClose() { + for (auto& msg : _messageQueue) { + if (!msg->getType().isReply()) { + send_shutdown_abort_reply(msg); + } + } + _messageQueue.clear(); + while (!_client_request_priority_queue.empty()) { + send_shutdown_abort_reply(_client_request_priority_queue.top()); + _client_request_priority_queue.pop(); + } + + LOG(debug, "DistributorStripe::onClose invoked"); + _pendingMessageTracker.abort_deferred_tasks(); + _bucketDBUpdater.flush(); + _externalOperationHandler.close_pending(); + _operationOwner.onClose(); + _maintenanceOperationOwner.onClose(); +} + +void DistributorStripe::send_up_without_tracking(const std::shared_ptr<api::StorageMessage>& msg) { + _messageSender.sendUp(msg); +} + +void +DistributorStripe::sendUp(const std::shared_ptr<api::StorageMessage>& msg) +{ + _pendingMessageTracker.insert(msg); + send_up_without_tracking(msg); +} + +bool +DistributorStripe::onDown(const std::shared_ptr<api::StorageMessage>& msg) +{ + if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) { + return true; + } + framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); + MBUS_TRACE(msg->getTrace(), 9, + "Distributor: Added to message queue. Thread state: " + + _threadPool.getStatus()); + _messageQueue.push_back(msg); + guard.broadcast(); + return true; +} + +void +DistributorStripe::handleCompletedMerge( + const std::shared_ptr<api::MergeBucketReply>& reply) +{ + _maintenanceOperationOwner.handleReply(reply); +} + +bool +DistributorStripe::isMaintenanceReply(const api::StorageReply& reply) const +{ + switch (reply.getType().getId()) { + case api::MessageType::CREATEBUCKET_REPLY_ID: + case api::MessageType::MERGEBUCKET_REPLY_ID: + case api::MessageType::DELETEBUCKET_REPLY_ID: + case api::MessageType::REQUESTBUCKETINFO_REPLY_ID: + case api::MessageType::SPLITBUCKET_REPLY_ID: + case api::MessageType::JOINBUCKETS_REPLY_ID: + case api::MessageType::SETBUCKETSTATE_REPLY_ID: + case api::MessageType::REMOVELOCATION_REPLY_ID: + return true; + default: + return false; + } +} + +bool +DistributorStripe::handleReply(const std::shared_ptr<api::StorageReply>& reply) +{ + document::Bucket bucket = _pendingMessageTracker.reply(*reply); + + if (reply->getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND && + bucket.getBucketId() != document::BucketId(0) && + reply->getAddress()) + { + recheckBucketInfo(reply->getAddress()->getIndex(), bucket); + } + + if (reply->callHandler(_bucketDBUpdater, reply)) { + return true; + } + + if (_operationOwner.handleReply(reply)) { + return true; + } + + if (_maintenanceOperationOwner.handleReply(reply)) { + _scanner->prioritizeBucket(bucket); + return true; + } + + // If it's a maintenance operation reply, it's most likely a reply to an + // operation whose state was flushed from the distributor when its node + // went down in the cluster state. Just swallow the reply to avoid getting + // warnings about unhandled messages at the bottom of the link chain. + return isMaintenanceReply(*reply); +} + +bool +DistributorStripe::generateOperation( + const std::shared_ptr<api::StorageMessage>& msg, + Operation::SP& operation) +{ + return _externalOperationHandler.handleMessage(msg, operation); +} + +bool +DistributorStripe::handleMessage(const std::shared_ptr<api::StorageMessage>& msg) +{ + if (msg->getType().isReply()) { + auto reply = std::dynamic_pointer_cast<api::StorageReply>(msg); + if (handleReply(reply)) { + return true; + } + } + + if (msg->callHandler(_bucketDBUpdater, msg)) { + return true; + } + + Operation::SP operation; + if (generateOperation(msg, operation)) { + if (operation.get()) { + _operationOwner.start(operation, msg->getPriority()); + } + return true; + } + + return false; +} + +const lib::ClusterStateBundle& +DistributorStripe::getClusterStateBundle() const +{ + return _clusterStateBundle; +} + +void +DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state) +{ + lib::Node my_node(lib::NodeType::DISTRIBUTOR, getDistributorIndex()); + lib::ClusterStateBundle oldState = _clusterStateBundle; + _clusterStateBundle = state; + propagateClusterStates(); + + const auto& baseline_state = *state.getBaselineClusterState(); + if (!_doneInitializing && (baseline_state.getNodeState(my_node).getState() == lib::State::UP)) { + _doneInitializing = true; + _doneInitializeHandler.notifyDoneInitializing(); + } + enterRecoveryMode(); + + // Clear all active messages on nodes that are down. + const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); + const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE); + for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) { + const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState(); + if (!node_state.oneOf(getStorageNodeUpStates())) { + std::vector<uint64_t> msgIds = _pendingMessageTracker.clearMessagesForNode(i); + LOG(debug, "Node %u is down, clearing %zu pending maintenance operations", i, msgIds.size()); + + for (uint32_t j = 0; j < msgIds.size(); ++j) { + _maintenanceOperationOwner.erase(msgIds[j]); + } + } + } + + if (_bucketDBUpdater.bucketOwnershipHasChanged()) { + using TimePoint = OwnershipTransferSafeTimePointCalculator::TimePoint; + // Note: this assumes that std::chrono::system_clock and the framework + // system clock have the same epoch, which should be a reasonable + // assumption. + const auto now = TimePoint(std::chrono::milliseconds( + _component.getClock().getTimeInMillis().getTime())); + _externalOperationHandler.rejectFeedBeforeTimeReached( + _ownershipSafeTimeCalc->safeTimePoint(now)); + } +} + +OperationRoutingSnapshot DistributorStripe::read_snapshot_for_bucket(const document::Bucket& bucket) const { + return _bucketDBUpdater.read_snapshot_for_bucket(bucket); +} + +void +DistributorStripe::notifyDistributionChangeEnabled() +{ + LOG(debug, "Pending cluster state for distribution change has been enabled"); + // Trigger a re-scan of bucket database, just like we do when a new cluster + // state has been enabled. + enterRecoveryMode(); +} + +void +DistributorStripe::enterRecoveryMode() +{ + LOG(debug, "Entering recovery mode"); + _schedulingMode = MaintenanceScheduler::RECOVERY_SCHEDULING_MODE; + _scanner->reset(); + _bucketDBMetricUpdater.reset(); + // TODO reset _bucketDbStats? + invalidate_bucket_spaces_stats(); + + _recoveryTimeStarted = framework::MilliSecTimer(_component.getClock()); +} + +void +DistributorStripe::leaveRecoveryMode() +{ + if (isInRecoveryMode()) { + LOG(debug, "Leaving recovery mode"); + // FIXME don't use shared metric for this + _metrics.recoveryModeTime.addValue( + _recoveryTimeStarted.getElapsedTimeAsDouble()); + if (_doneInitializing) { + _must_send_updated_host_info = true; + } + } + _schedulingMode = MaintenanceScheduler::NORMAL_SCHEDULING_MODE; +} + +template <typename NodeFunctor> +void DistributorStripe::for_each_available_content_node_in(const lib::ClusterState& state, NodeFunctor&& func) { + const auto node_count = state.getNodeCount(lib::NodeType::STORAGE); + for (uint16_t i = 0; i < node_count; ++i) { + lib::Node node(lib::NodeType::STORAGE, i); + if (state.getNodeState(node).getState().oneOf("uir")) { + func(node); + } + } +} + +BucketSpacesStatsProvider::BucketSpacesStats DistributorStripe::make_invalid_stats_per_configured_space() const { + BucketSpacesStatsProvider::BucketSpacesStats invalid_space_stats; + for (auto& space : *_bucketSpaceRepo) { + invalid_space_stats.emplace(document::FixedBucketSpaces::to_string(space.first), + BucketSpaceStats::make_invalid()); + } + return invalid_space_stats; +} + +void DistributorStripe::invalidate_bucket_spaces_stats() { + std::lock_guard guard(_metricLock); + _bucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats(); + auto invalid_space_stats = make_invalid_stats_per_configured_space(); + + const auto& baseline = *_clusterStateBundle.getBaselineClusterState(); + for_each_available_content_node_in(baseline, [this, &invalid_space_stats](const lib::Node& node) { + _bucketSpacesStats[node.getIndex()] = invalid_space_stats; + }); +} + +void +DistributorStripe::storageDistributionChanged() +{ + if (!_distribution.get() + || *_component.getDistribution() != *_distribution) + { + LOG(debug, + "Distribution changed to %s, must refetch bucket information", + _component.getDistribution()->toString().c_str()); + + // FIXME this is not thread safe + _nextDistribution = _component.getDistribution(); + } else { + LOG(debug, + "Got distribution change, but the distribution %s was the same as " + "before: %s", + _component.getDistribution()->toString().c_str(), + _distribution->toString().c_str()); + } +} + +void +DistributorStripe::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) { + _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket); +} + +namespace { + +class SplitChecker : public PendingMessageTracker::Checker +{ +public: + bool found; + uint8_t maxPri; + + SplitChecker(uint8_t maxP) : found(false), maxPri(maxP) {}; + + bool check(uint32_t msgType, uint16_t node, uint8_t pri) override { + (void) node; + (void) pri; + if (msgType == api::MessageType::SPLITBUCKET_ID && pri <= maxPri) { + found = true; + return false; + } + + return true; + } +}; + +} + +void +DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace, + const BucketDatabase::Entry& e, + uint8_t priority) +{ + if (!getConfig().doInlineSplit()) { + return; + } + + // Verify that there are no existing pending splits at the + // appropriate priority. + SplitChecker checker(priority); + for (uint32_t i = 0; i < e->getNodeCount(); ++i) { + _pendingMessageTracker.checkPendingMessages(e->getNodeRef(i).getNode(), + document::Bucket(bucketSpace, e.getBucketId()), + checker); + if (checker.found) { + return; + } + } + + Operation::SP operation = + _idealStateManager.generateInterceptingSplit(bucketSpace, e, priority); + + if (operation.get()) { + _maintenanceOperationOwner.start(operation, priority); + } +} + +void +DistributorStripe::enableNextDistribution() +{ + if (_nextDistribution.get()) { + _distribution = _nextDistribution; + propagateDefaultDistribution(_distribution); + _nextDistribution = std::shared_ptr<lib::Distribution>(); + _bucketDBUpdater.storageDistributionChanged(); + } +} + +void +DistributorStripe::propagateDefaultDistribution( + std::shared_ptr<const lib::Distribution> distribution) +{ + auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); + for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { + repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); + repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); + } +} + +void +DistributorStripe::propagateClusterStates() +{ + for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { + for (auto& iter : *repo) { + iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first)); + } + } +} + +void +DistributorStripe::signalWorkWasDone() +{ + _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED; +} + +bool +DistributorStripe::workWasDone() const noexcept +{ + return !_tickResult.waitWanted(); +} + +namespace { + +bool is_client_request(const api::StorageMessage& msg) noexcept { + // Despite having been converted to StorageAPI messages, the following + // set of messages are never sent to the distributor by other processes + // than clients. + switch (msg.getType().getId()) { + case api::MessageType::GET_ID: + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::VISITOR_DESTROY_ID: + case api::MessageType::GETBUCKETLIST_ID: + case api::MessageType::STATBUCKET_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::REMOVELOCATION_ID: + return true; + default: + return false; + } +} + +} + +void DistributorStripe::handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg) { + if (!handleMessage(msg)) { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down."); + _messageSender.sendDown(msg); + } +} + +void DistributorStripe::startExternalOperations() { + for (auto& msg : _fetchedMessages) { + if (is_client_request(*msg)) { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue"); + _client_request_priority_queue.emplace(std::move(msg)); + } else { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed."); + handle_or_propagate_message(msg); + } + } + + const bool start_single_client_request = !_client_request_priority_queue.empty(); + if (start_single_client_request) { + const auto& msg = _client_request_priority_queue.top(); + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from " + "client request priority queue to be processed."); + handle_or_propagate_message(msg); + _client_request_priority_queue.pop(); + } + + if (!_fetchedMessages.empty() || start_single_client_request) { + signalWorkWasDone(); + } + _fetchedMessages.clear(); +} + +std::unordered_map<uint16_t, uint32_t> +DistributorStripe::getMinReplica() const +{ + std::lock_guard guard(_metricLock); + return _bucketDbStats._minBucketReplica; +} + +BucketSpacesStatsProvider::PerNodeBucketSpacesStats +DistributorStripe::getBucketSpacesStats() const +{ + std::lock_guard guard(_metricLock); + return _bucketSpacesStats; +} + +SimpleMaintenanceScanner::PendingMaintenanceStats +DistributorStripe::pending_maintenance_stats() const { + std::lock_guard guard(_metricLock); + return _maintenanceStats; +} + +void +DistributorStripe::propagateInternalScanMetricsToExternal() +{ + std::lock_guard guard(_metricLock); + + // All shared values are written when _metricLock is held, so no races. + if (_bucketDBMetricUpdater.hasCompletedRound()) { + _bucketDbStats.propagateMetrics(_idealStateManager.getMetrics(), getMetrics()); + _idealStateManager.getMetrics().setPendingOperations(_maintenanceStats.global.pending); + } +} + +namespace { + +BucketSpaceStats +toBucketSpaceStats(const NodeMaintenanceStats &stats) +{ + return BucketSpaceStats(stats.total, stats.syncing + stats.copyingIn); +} + +using PerNodeBucketSpacesStats = BucketSpacesStatsProvider::PerNodeBucketSpacesStats; + +PerNodeBucketSpacesStats +toBucketSpacesStats(const NodeMaintenanceStatsTracker &maintenanceStats) +{ + PerNodeBucketSpacesStats result; + for (const auto &nodeEntry : maintenanceStats.perNodeStats()) { + for (const auto &bucketSpaceEntry : nodeEntry.second) { + auto bucketSpace = document::FixedBucketSpaces::to_string(bucketSpaceEntry.first); + result[nodeEntry.first][bucketSpace] = toBucketSpaceStats(bucketSpaceEntry.second); + } + } + return result; +} + +size_t spaces_with_merges_pending(const PerNodeBucketSpacesStats& stats) { + std::unordered_set<document::BucketSpace, document::BucketSpace::hash> spaces_with_pending; + for (auto& node : stats) { + for (auto& space : node.second) { + if (space.second.valid() && space.second.bucketsPending() != 0) { + // TODO avoid bucket space string roundtrip + spaces_with_pending.emplace(document::FixedBucketSpaces::from_string(space.first)); + } + } + } + return spaces_with_pending.size(); +} + +// TODO should we also trigger on !pending --> pending edge? +bool merge_no_longer_pending_edge(const PerNodeBucketSpacesStats& prev_stats, + const PerNodeBucketSpacesStats& curr_stats) { + const auto prev_pending = spaces_with_merges_pending(prev_stats); + const auto curr_pending = spaces_with_merges_pending(curr_stats); + return curr_pending < prev_pending; +} + +} + +void +DistributorStripe::updateInternalMetricsForCompletedScan() +{ + std::lock_guard guard(_metricLock); + + _bucketDBMetricUpdater.completeRound(); + _bucketDbStats = _bucketDBMetricUpdater.getLastCompleteStats(); + _maintenanceStats = _scanner->getPendingMaintenanceStats(); + auto new_space_stats = toBucketSpacesStats(_maintenanceStats.perNodeStats); + if (merge_no_longer_pending_edge(_bucketSpacesStats, new_space_stats)) { + _must_send_updated_host_info = true; + } + _bucketSpacesStats = std::move(new_space_stats); + maybe_update_bucket_db_memory_usage_stats(); +} + +void DistributorStripe::maybe_update_bucket_db_memory_usage_stats() { + auto now = _component.getClock().getMonotonicTime(); + if ((now - _last_db_memory_sample_time_point) > _db_memory_sample_interval) { + for (auto& space : *_bucketSpaceRepo) { + _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), true); + } + for (auto& space : *_readOnlyBucketSpaceRepo) { + _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), false); + } + _last_db_memory_sample_time_point = now; + } else { + // Reuse previous memory statistics instead of sampling new. + _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._mutable_db_mem_usage, true); + _bucketDBMetricUpdater.update_db_memory_usage(_bucketDbStats._read_only_db_mem_usage, false); + } +} + +void +DistributorStripe::scanAllBuckets() +{ + enterRecoveryMode(); + while (!scanNextBucket().isDone()) {} +} + +MaintenanceScanner::ScanResult +DistributorStripe::scanNextBucket() +{ + MaintenanceScanner::ScanResult scanResult(_scanner->scanNext()); + if (scanResult.isDone()) { + updateInternalMetricsForCompletedScan(); + leaveRecoveryMode(); + send_updated_host_info_if_required(); + _scanner->reset(); + } else { + const auto &distribution(_bucketSpaceRepo->get(scanResult.getBucketSpace()).getDistribution()); + _bucketDBMetricUpdater.visit( + scanResult.getEntry(), + distribution.getRedundancy()); + } + return scanResult; +} + +void DistributorStripe::send_updated_host_info_if_required() { + if (_must_send_updated_host_info) { + _component.getStateUpdater().immediately_send_get_node_state_replies(); + _must_send_updated_host_info = false; + } +} + +void +DistributorStripe::startNextMaintenanceOperation() +{ + _throttlingStarter->setMaxPendingRange(getConfig().getMinPendingMaintenanceOps(), + getConfig().getMaxPendingMaintenanceOps()); + _scheduler->tick(_schedulingMode); +} + +framework::ThreadWaitInfo +DistributorStripe::doCriticalTick(framework::ThreadIndex) +{ + _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + enableNextDistribution(); + enableNextConfig(); + fetchStatusRequests(); + fetchExternalMessages(); + return _tickResult; +} + +framework::ThreadWaitInfo +DistributorStripe::doNonCriticalTick(framework::ThreadIndex) +{ + _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + handleStatusRequests(); + startExternalOperations(); + if (initializing()) { + _bucketDBUpdater.resendDelayedMessages(); + return _tickResult; + } + // Ordering note: since maintenance inhibiting checks whether startExternalOperations() + // did any useful work with incoming data, this check must be performed _after_ the call. + if (!should_inhibit_current_maintenance_scan_tick()) { + scanNextBucket(); + startNextMaintenanceOperation(); + if (isInRecoveryMode()) { + signalWorkWasDone(); + } + mark_maintenance_tick_as_no_longer_inhibited(); + _bucketDBUpdater.resendDelayedMessages(); + } else { + mark_current_maintenance_tick_as_inhibited(); + } + return _tickResult; +} + +bool DistributorStripe::should_inhibit_current_maintenance_scan_tick() const noexcept { + return (workWasDone() && (_inhibited_maintenance_tick_count + < getConfig().max_consecutively_inhibited_maintenance_ticks())); +} + +void DistributorStripe::mark_current_maintenance_tick_as_inhibited() noexcept { + ++_inhibited_maintenance_tick_count; +} + +void DistributorStripe::mark_maintenance_tick_as_no_longer_inhibited() noexcept { + _inhibited_maintenance_tick_count = 0; +} + +void +DistributorStripe::enableNextConfig() +{ + _bucketDBMetricUpdater.setMinimumReplicaCountingMode(getConfig().getMinimumReplicaCountingMode()); + _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew()); + _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration()); + _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions()); + _externalOperationHandler.set_concurrent_gets_enabled( + getConfig().allowStaleReadsDuringClusterStateTransitions()); + _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets( + getConfig().use_weak_internal_read_consistency_for_client_gets()); +} + +void +DistributorStripe::fetchStatusRequests() +{ + if (_fetchedStatusRequests.empty()) { + _fetchedStatusRequests.swap(_statusToDo); + } +} + +void +DistributorStripe::fetchExternalMessages() +{ + assert(_fetchedMessages.empty()); + _fetchedMessages.swap(_messageQueue); +} + +void +DistributorStripe::handleStatusRequests() +{ + uint32_t sz = _fetchedStatusRequests.size(); + for (uint32_t i = 0; i < sz; ++i) { + auto& s = *_fetchedStatusRequests[i]; + s.getReporter().reportStatus(s.getStream(), s.getPath()); + s.notifyCompleted(); + } + _fetchedStatusRequests.clear(); + if (sz > 0) { + signalWorkWasDone(); + } +} + +vespalib::string +DistributorStripe::getReportContentType(const framework::HttpUrlPath& path) const +{ + if (path.hasAttribute("page")) { + if (path.getAttribute("page") == "buckets") { + return "text/html"; + } else { + return "application/xml"; + } + } else { + return "text/html"; + } +} + +std::string +DistributorStripe::getActiveIdealStateOperations() const +{ + return _maintenanceOperationOwner.toString(); +} + +std::string +DistributorStripe::getActiveOperations() const +{ + return _operationOwner.toString(); +} + +bool +DistributorStripe::reportStatus(std::ostream& out, + const framework::HttpUrlPath& path) const +{ + if (!path.hasAttribute("page") || path.getAttribute("page") == "buckets") { + framework::PartlyHtmlStatusReporter htmlReporter(*this); + htmlReporter.reportHtmlHeader(out, path); + if (!path.hasAttribute("page")) { + out << "<a href=\"?page=pending\">Count of pending messages to " + << "storage nodes</a><br><a href=\"?page=maintenance&show=50\">" + << "List maintenance queue (adjust show parameter to see more " + << "operations, -1 for all)</a><br>\n<a href=\"?page=buckets\">" + << "List all buckets, highlight non-ideal state</a><br>\n"; + } else { + const_cast<IdealStateManager&>(_idealStateManager) + .getBucketStatus(out); + } + htmlReporter.reportHtmlFooter(out, path); + } else { + framework::PartlyXmlStatusReporter xmlReporter(*this, out, path); + using namespace vespalib::xml; + std::string page(path.getAttribute("page")); + + if (page == "pending") { + xmlReporter << XmlTag("pending") + << XmlAttribute("externalload", _operationOwner.size()) + << XmlAttribute("maintenance", + _maintenanceOperationOwner.size()) + << XmlEndTag(); + } else if (page == "maintenance") { + // Need new page + } + } + + return true; +} + +bool +DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const +{ + auto wrappedRequest = std::make_shared<DistributorStatus>(request); + { + framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); + _statusToDo.push_back(wrappedRequest); + guard.broadcast(); + } + wrappedRequest->waitForCompletion(); + return true; +} + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h new file mode 100644 index 00000000000..dbf899a6de2 --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -0,0 +1,352 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bucket_spaces_stats_provider.h" +#include "bucketdbupdater.h" +#include "distributor_host_info_reporter.h" +#include "distributorinterface.h" +#include "externaloperationhandler.h" +#include "idealstatemanager.h" +#include "min_replica_provider.h" +#include "pendingmessagetracker.h" +#include "statusreporterdelegate.h" +#include <vespa/config/config.h> +#include <vespa/storage/common/doneinitializehandler.h> +#include <vespa/storage/common/messagesender.h> +#include <vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h> +#include <vespa/storage/distributor/distributorcomponent.h> +#include <vespa/storage/distributor/maintenance/maintenancescheduler.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/storageframework/generic/metric/metricupdatehook.h> +#include <vespa/storageframework/generic/thread/tickingthread.h> +#include <queue> +#include <unordered_map> + +namespace storage { + struct DoneInitializeHandler; + class HostInfo; + class NodeIdentity; +} + +namespace storage::distributor { + +class BlockingOperationStarter; +class BucketPriorityDatabase; +class DistributorStatus; +class DistributorBucketSpaceRepo; +class OperationSequencer; +class OwnershipTransferSafeTimePointCalculator; +class SimpleMaintenanceScanner; +class ThrottlingOperationStarter; + +class DistributorStripe final + : public StorageLink, // TODO decouple + public DistributorInterface, + public StatusDelegator, + public framework::StatusReporter, + public framework::TickingThread, + public MinReplicaProvider, + public BucketSpacesStatsProvider, + public NonTrackingMessageSender +{ +public: + DistributorStripe(DistributorComponentRegister&, + DistributorMetricSet& metrics, + const NodeIdentity& node_identity, + framework::TickingThreadPool&, + DoneInitializeHandler&, + bool manageActiveBucketCopies, + ChainedMessageSender& messageSender); + + ~DistributorStripe() override; + + const ClusterContext& cluster_context() const override { + return _component.cluster_context(); + } + void onOpen() override; + void onClose() override; + bool onDown(const std::shared_ptr<api::StorageMessage>&) override; + void sendUp(const std::shared_ptr<api::StorageMessage>&) override; + // Bypasses message tracker component. Thread safe. + void send_up_without_tracking(const std::shared_ptr<api::StorageMessage>&) override; + + ChainedMessageSender& getMessageSender() override { + return _messageSender; + } + + DistributorMetricSet& getMetrics() override { return _metrics; } + + PendingMessageTracker& getPendingMessageTracker() override { + return _pendingMessageTracker; + } + + const OperationSequencer& operation_sequencer() const noexcept override { + return *_operation_sequencer; + } + + const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override; + + /** + * Enables a new cluster state. Called after the bucket db updater has + * retrieved all bucket info related to the change. + */ + void enableClusterStateBundle(const lib::ClusterStateBundle& clusterStateBundle) override; + + /** + * Invoked when a pending cluster state for a distribution (config) + * change has been enabled. An invocation of storageDistributionChanged + * will eventually cause this method to be called, assuming the pending + * cluster state completed successfully. + */ + void notifyDistributionChangeEnabled() override; + + void storageDistributionChanged() override; + + void recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) override; + + bool handleReply(const std::shared_ptr<api::StorageReply>& reply) override; + + // StatusReporter implementation + vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; + bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; + + bool handleStatusRequest(const DelegatedStatusRequest& request) const override; + + std::string getActiveIdealStateOperations() const; + std::string getActiveOperations() const; + + virtual framework::ThreadWaitInfo doCriticalTick(framework::ThreadIndex) override; + virtual framework::ThreadWaitInfo doNonCriticalTick(framework::ThreadIndex) override; + + /** + * Checks whether a bucket needs to be split, and sends a split + * if so. + */ + void checkBucketForSplit(document::BucketSpace bucketSpace, + const BucketDatabase::Entry& e, + uint8_t priority) override; + + const lib::ClusterStateBundle& getClusterStateBundle() const override; + + /** + * @return Returns the states in which the distributors consider + * storage nodes to be up. + */ + const char* getStorageNodeUpStates() const override { + return "uri"; + } + + /** + * Called by bucket db updater after a merge has finished, and all the + * request bucket info operations have been performed as well. Passes the + * merge back to the operation that created it. + */ + void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) override; + + bool initializing() const override { + return !_doneInitializing; + } + + const DistributorConfiguration& getConfig() const override { + return _component.getTotalDistributorConfig(); + } + + bool isInRecoveryMode() const noexcept { + return _schedulingMode == MaintenanceScheduler::RECOVERY_SCHEDULING_MODE; + } + + int getDistributorIndex() const override; + const PendingMessageTracker& getPendingMessageTracker() const override; + void sendCommand(const std::shared_ptr<api::StorageCommand>&) override; + void sendReply(const std::shared_ptr<api::StorageReply>&) override; + + const BucketGcTimeCalculator::BucketIdHasher& + getBucketIdHasher() const override { + return *_bucketIdHasher; + } + + BucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; } + const BucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; } + IdealStateManager& ideal_state_manager() { return _idealStateManager; } + const IdealStateManager& ideal_state_manager() const { return _idealStateManager; } + ExternalOperationHandler& external_operation_handler() { return _externalOperationHandler; } + const ExternalOperationHandler& external_operation_handler() const { return _externalOperationHandler; } + + DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; } + const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; } + + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept { + return *_readOnlyBucketSpaceRepo; + } + const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept { + return *_readOnlyBucketSpaceRepo; + } + + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override; + + class MetricUpdateHook : public framework::MetricUpdateHook + { + public: + MetricUpdateHook(DistributorStripe& self) + : _self(self) + { + } + + void updateMetrics(const MetricLockGuard &) override { + _self.propagateInternalScanMetricsToExternal(); + } + + private: + DistributorStripe& _self; + }; + + std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept { + return _db_memory_sample_interval; + } + +private: + friend struct DistributorTest; + friend class BucketDBUpdaterTest; + friend class DistributorTestUtil; + friend class MetricUpdateHook; + friend class Distributor; + + bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); + bool isMaintenanceReply(const api::StorageReply& reply) const; + + void handleStatusRequests(); + void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&); + void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg); + void startExternalOperations(); + + /** + * Return a copy of the latest min replica data, see MinReplicaProvider. + */ + std::unordered_map<uint16_t, uint32_t> getMinReplica() const override; + + PerNodeBucketSpacesStats getBucketSpacesStats() const override; + + SimpleMaintenanceScanner::PendingMaintenanceStats pending_maintenance_stats() const; + + /** + * Atomically publish internal metrics to external ideal state metrics. + * Takes metric lock. + */ + void propagateInternalScanMetricsToExternal(); + /** + * Atomically updates internal metrics (not externally visible metrics; + * these are not changed until a snapshot triggers + * propagateIdealStateMetrics()). + * + * Takes metric lock. + */ + void updateInternalMetricsForCompletedScan(); + void maybe_update_bucket_db_memory_usage_stats(); + void scanAllBuckets(); + MaintenanceScanner::ScanResult scanNextBucket(); + bool should_inhibit_current_maintenance_scan_tick() const noexcept; + void mark_current_maintenance_tick_as_inhibited() noexcept; + void mark_maintenance_tick_as_no_longer_inhibited() noexcept; + void enableNextConfig(); + void fetchStatusRequests(); + void fetchExternalMessages(); + void startNextMaintenanceOperation(); + void signalWorkWasDone(); + bool workWasDone() const noexcept; + + void enterRecoveryMode(); + void leaveRecoveryMode(); + + // Tries to generate an operation from the given message. Returns true + // if we either returned an operation, or the message was otherwise handled + // (for instance, wrong distribution). + bool generateOperation(const std::shared_ptr<api::StorageMessage>& msg, + Operation::SP& operation); + + void enableNextDistribution(); + void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); + void propagateClusterStates(); + + BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const; + template <typename NodeFunctor> + void for_each_available_content_node_in(const lib::ClusterState&, NodeFunctor&&); + void invalidate_bucket_spaces_stats(); + void send_updated_host_info_if_required(); + + lib::ClusterStateBundle _clusterStateBundle; + + std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo; + // Read-only bucket space repo with DBs that only contain buckets transiently + // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo + // and the DBs are empty during non-transition phases. + std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo; + storage::distributor::DistributorComponent _component; + DistributorMetricSet& _metrics; + + OperationOwner _operationOwner; + OperationOwner _maintenanceOperationOwner; + + std::unique_ptr<OperationSequencer> _operation_sequencer; + PendingMessageTracker _pendingMessageTracker; + BucketDBUpdater _bucketDBUpdater; + StatusReporterDelegate _distributorStatusDelegate; + StatusReporterDelegate _bucketDBStatusDelegate; + IdealStateManager _idealStateManager; + ChainedMessageSender& _messageSender; + ExternalOperationHandler _externalOperationHandler; + + std::shared_ptr<lib::Distribution> _distribution; + std::shared_ptr<lib::Distribution> _nextDistribution; + + using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>; + struct IndirectHigherPriority { + template <typename Lhs, typename Rhs> + bool operator()(const Lhs& lhs, const Rhs& rhs) const noexcept { + return lhs->getPriority() > rhs->getPriority(); + } + }; + using ClientRequestPriorityQueue = std::priority_queue< + std::shared_ptr<api::StorageMessage>, + std::vector<std::shared_ptr<api::StorageMessage>>, + IndirectHigherPriority + >; + MessageQueue _messageQueue; + ClientRequestPriorityQueue _client_request_priority_queue; + MessageQueue _fetchedMessages; + framework::TickingThreadPool& _threadPool; + + mutable std::vector<std::shared_ptr<DistributorStatus>> _statusToDo; + mutable std::vector<std::shared_ptr<DistributorStatus>> _fetchedStatusRequests; + + DoneInitializeHandler& _doneInitializeHandler; + bool _doneInitializing; + + std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb; + std::unique_ptr<SimpleMaintenanceScanner> _scanner; + std::unique_ptr<ThrottlingOperationStarter> _throttlingStarter; + std::unique_ptr<BlockingOperationStarter> _blockingStarter; + std::unique_ptr<MaintenanceScheduler> _scheduler; + MaintenanceScheduler::SchedulingMode _schedulingMode; + framework::MilliSecTimer _recoveryTimeStarted; + framework::ThreadWaitInfo _tickResult; + BucketDBMetricUpdater _bucketDBMetricUpdater; + std::unique_ptr<BucketGcTimeCalculator::BucketIdHasher> _bucketIdHasher; + MetricUpdateHook _metricUpdateHook; + mutable std::mutex _metricLock; + /** + * Maintenance stats for last completed database scan iteration. + * Access must be protected by _metricLock as it is read by metric + * manager thread but written by distributor thread. + */ + SimpleMaintenanceScanner::PendingMaintenanceStats _maintenanceStats; + BucketSpacesStatsProvider::PerNodeBucketSpacesStats _bucketSpacesStats; + BucketDBMetricUpdater::Stats _bucketDbStats; + std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc; + std::chrono::steady_clock::duration _db_memory_sample_interval; + std::chrono::steady_clock::time_point _last_db_memory_sample_time_point; + size_t _inhibited_maintenance_tick_count; + bool _must_send_updated_host_info; +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index 3f6c125bbfa..e5fe3c6c43c 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -139,9 +139,9 @@ class UpdateBucketDatabaseProcessor : public BucketDatabase::EntryUpdateProcesso bool _reset_trusted; public: UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted); - virtual ~UpdateBucketDatabaseProcessor(); - virtual BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override; - virtual bool process_entry(BucketDatabase::Entry &entry) const override; + ~UpdateBucketDatabaseProcessor() override; + BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override; + bool process_entry(BucketDatabase::Entry &entry) const override; }; UpdateBucketDatabaseProcessor::UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted) @@ -291,11 +291,6 @@ DistributorComponent::createAppropriateBucket(const document::Bucket &bucket) } bool -DistributorComponent::initializing() const { - return _distributor.initializing(); -} - -bool DistributorComponent::has_pending_message(uint16_t node_index, const document::Bucket& bucket, uint32_t message_type) const diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index ca953ed01ef..6a3620cbcf7 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -139,11 +139,6 @@ public: */ BucketDatabase::Entry createAppropriateBucket(const document::Bucket &bucket); - /** - * Returns true if the node is currently initializing. - */ - bool initializing() const; - // Implements DistributorNodeContext const framework::Clock& clock() const noexcept override { return getClock(); } const vespalib::string * cluster_name_ptr() const noexcept override { return cluster_context().cluster_name_ptr(); } diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp index 6dbe6b1c2a5..c92f6a3dfcf 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp @@ -25,7 +25,7 @@ namespace storage { namespace distributor { IdealStateManager::IdealStateManager( - Distributor& owner, + DistributorInterface& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorComponentRegister& compReg, diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h index 7ed28d845d7..19f88334889 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.h +++ b/storage/src/vespa/storage/distributor/idealstatemanager.h @@ -11,7 +11,7 @@ namespace storage::distributor { class IdealStateMetricSet; class IdealStateOperation; -class Distributor; +class DistributorInterface; class SplitBucketStateChecker; /** @@ -34,7 +34,7 @@ class IdealStateManager : public framework::HtmlStatusReporter, { public: - IdealStateManager(Distributor& owner, + IdealStateManager(DistributorInterface& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorComponentRegister& compReg, diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index d80666518ee..7762918405d 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -167,18 +167,17 @@ FileStorHandlerImpl::flush(bool killPendingMerges) if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); - for (auto & entry : _mergeStates) - { + for (auto & entry : _mergeStates) { MergeStatus& s(*entry.second); - if (s.pendingGetDiff.get() != 0) { + if (s.pendingGetDiff) { s.pendingGetDiff->setResult(code); _messageSender.sendReply(s.pendingGetDiff); } - if (s.pendingApplyDiff.get() != 0) { + if (s.pendingApplyDiff) { s.pendingApplyDiff->setResult(code); _messageSender.sendReply(s.pendingApplyDiff); } - if (s.reply.get() != 0) { + if (s.reply) { s.reply->setResult(code); _messageSender.sendReply(s.reply); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 83f268358cb..f1063bf9c10 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -835,18 +835,12 @@ void FileStorManager::onFlush(bool downwards) LOG(debug, "flushed thread[%s]", thread->getThread().getId().c_str()); } } - uint32_t queueSize = _filestorHandler->getQueueSize(); - std::ostringstream ost; - if (queueSize > 0) { - ost << "Queue size " << queueSize; - } - std::string result = ost.str(); - if (result.size() > 0) { + uint32_t queue_size = _filestorHandler->getQueueSize(); + if (queue_size > 0) { LOG(error, "Operations in persistence layer after flush. This is ok " "during load, but should not happen when flush is called " "during shutdown as load then is supposed to have been " - "stopped: %s", - result.c_str()); + "stopped: Queue size is %u", queue_size); } StorageLinkQueued::onFlush(downwards); LOG(debug, "Done Flushing"); diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index 4c075f44d35..db4cddce032 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -112,6 +112,7 @@ void SharedRpcResources::shutdown() { _slobrok_register->unregisterName(_handle); } _transport->ShutDown(true); + // FIXME need to reset to break weak_ptrs? But ShutDown should already sync pending resolves...! _shutdown = true; } |