diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-03-17 23:14:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-17 23:14:53 +0100 |
commit | 841d8fd3093b4794eaedb6fcfd29b2ab152f618a (patch) | |
tree | 793da24d970690fc81aac13e32c81be68544a497 | |
parent | c8ef45681f5fad722de427aba9368b81fd85f63d (diff) | |
parent | 06b5a80a3419c12f00500a998558f5253b08e0aa (diff) |
Merge pull request #17013 from vespa-engine/geirst/bucket-db-updater-refactoring
Bucket db updater refactoring
8 files changed, 99 insertions, 81 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 22d9c945262..1b93e728a04 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -72,7 +72,7 @@ public: lib::ClusterStateBundle clusterStateBundle(baselineClusterState); ClusterInformation::CSP clusterInfo( new SimpleClusterInformation( - getBucketDBUpdater().getDistributorComponent().getIndex(), + getBucketDBUpdater().node_context().node_index(), clusterStateBundle, "ui")); for (auto* repo : {&mutable_repo(), &read_only_repo()}) { @@ -1966,7 +1966,7 @@ TEST_F(BucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fe document::BucketId bucket(16, 1); constexpr uint64_t insertionTimestamp = 1001ULL * 1000000; api::BucketInfo wantedInfo(5, 6, 7); - getBucketDBUpdater().getDistributorComponent().updateBucketDatabase( + getBucketDBUpdater().operation_context().update_bucket_database( makeDocumentBucket(bucket), BucketCopy(insertionTimestamp, 0, wantedInfo), DatabaseUpdate::CREATE_IF_NONEXISTING); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 17a9b4f5127..12fd14c260e 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -31,17 +31,26 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, DistributorComponentRegister& compReg) : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), + _node_ctx(_distributorComponent), + _op_ctx(_distributorComponent), + _distributor_interface(_distributorComponent.getDistributor()), + _delayedRequests(), + _sentMessages(), + _pendingClusterState(), + _history(), _sender(sender), - _transitionTimer(_distributorComponent.getClock()), + _enqueuedRechecks(), + _outdatedNodesMap(), + _transitionTimer(_node_ctx.clock()), _stale_reads_enabled(false), _active_distribution_contexts(), _explicit_transition_read_guard(), _distribution_context_mutex() { - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { _active_distribution_contexts.emplace( elem.first, - BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex())); + BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index())); _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>()); } } @@ -62,8 +71,8 @@ OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const documen return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); } const auto& space_repo = bucket_present_in_mutable_db - ? _distributorComponent.getBucketSpaceRepo() - : _distributorComponent.getReadOnlyBucketSpaceRepo(); + ? _op_ctx.bucket_space_repo() + : _op_ctx.read_only_bucket_space_repo(); auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space); assert(existing_guard_iter != _explicit_transition_read_guard.cend()); auto db_guard = existing_guard_iter->second @@ -117,15 +126,14 @@ BucketDBUpdater::sendRequestBucketInfo( const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) { - if (!_distributorComponent.storageNodeIsUp(bucket.getBucketSpace(), node)) { + if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) { return; } std::vector<document::BucketId> buckets; buckets.push_back(bucket.getBucketId()); - std::shared_ptr<api::RequestBucketInfoCommand> msg( - new api::RequestBucketInfoCommand(bucket.getBucketSpace(), buckets)); + auto msg = std::make_shared<api::RequestBucketInfoCommand>(bucket.getBucketSpace(), buckets); LOG(debug, "Sending request bucket info command %" PRIu64 " for " @@ -135,10 +143,10 @@ BucketDBUpdater::sendRequestBucketInfo( node); msg->setPriority(50); - msg->setAddress(_distributorComponent.nodeAddress(node)); + msg->setAddress(_node_ctx.node_address(node)); _sentMessages[msg->getMsgId()] = - BucketRequest(node, _distributorComponent.getUniqueTimestamp(), + BucketRequest(node, _op_ctx.generate_unique_timestamp(), bucket, mergeReplyGuard); _sender.sendCommand(msg); } @@ -203,8 +211,8 @@ BucketDBUpdater::removeSuperfluousBuckets( bool is_distribution_config_change) { const bool move_to_read_only_db = shouldDeferStateEnabling(); - const char* up_states = _distributorComponent.getDistributor().getStorageNodeUpStates(); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + const char* up_states = _op_ctx.storage_node_up_states(); + for (auto& elem : _op_ctx.bucket_space_repo()) { const auto& newDistribution(elem.second->getDistribution()); const auto& oldClusterState(elem.second->getClusterState()); const auto& new_cluster_state = newState.getDerivedClusterState(elem.first); @@ -221,14 +229,14 @@ BucketDBUpdater::removeSuperfluousBuckets( } auto& bucketDb(elem.second->getBucketDatabase()); - auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase()); + auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase()); // Remove all buckets not belonging to this distributor, or // being on storage nodes that are no longer up. MergingNodeRemover proc( oldClusterState, *new_cluster_state, - _distributorComponent.getIndex(), + _node_ctx.node_index(), newDistribution, up_states, move_to_read_only_db); @@ -254,12 +262,12 @@ void maybe_sleep_for(std::chrono::milliseconds ms) { void BucketDBUpdater::maybe_inject_simulated_db_pruning_delay() { - maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_pruning_latency()); + maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency()); } void BucketDBUpdater::maybe_inject_simulated_db_merging_delay() { - maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_merging_latency()); + maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency()); } void @@ -269,21 +277,21 @@ BucketDBUpdater::ensureTransitionTimerStarted() // that will make transition times appear artificially low. if (!hasPendingClusterState()) { _transitionTimer = framework::MilliSecTimer( - _distributorComponent.getClock()); + _node_ctx.clock()); } } void BucketDBUpdater::completeTransitionTimer() { - _distributorComponent.getDistributor().getMetrics() + _distributor_interface.getMetrics() .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble()); } void BucketDBUpdater::clearReadOnlyBucketRepoDatabases() { - for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) { + for (auto& space : _op_ctx.read_only_bucket_space_repo()) { space.second->getBucketDatabase().clear(); } } @@ -293,27 +301,27 @@ BucketDBUpdater::storageDistributionChanged() { ensureTransitionTimerStarted(); - removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle(), true); + removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true); - ClusterInformation::CSP clusterInfo(new SimpleClusterInformation( - _distributorComponent.getIndex(), - _distributorComponent.getClusterStateBundle(), - _distributorComponent.getDistributor().getStorageNodeUpStates())); + auto clusterInfo = std::make_shared<const SimpleClusterInformation>( + _node_ctx.node_index(), + _op_ctx.cluster_state_bundle(), + _op_ctx.storage_node_up_states()); _pendingClusterState = PendingClusterState::createForDistributionChange( - _distributorComponent.getClock(), + _node_ctx.clock(), std::move(clusterInfo), _sender, - _distributorComponent.getBucketSpaceRepo(), - _distributorComponent.getUniqueTimestamp()); + _op_ctx.bucket_space_repo(), + _op_ctx.generate_unique_timestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); - _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); + _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); } void BucketDBUpdater::replyToPreviousPendingClusterStateIfAny() { if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) { - _distributorComponent.sendUp( + _distributor_interface.getMessageSender().sendUp( std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand())); } } @@ -325,12 +333,12 @@ BucketDBUpdater::replyToActivationWithActualVersion( { auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd); reply->setActualVersion(actualVersion); - _distributorComponent.sendUp(reply); // TODO let API accept rvalues + _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues } void BucketDBUpdater::update_read_snapshot_before_db_pruning() { std::lock_guard lock(_distribution_context_mutex); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { // At this point, we're still operating with a distribution context _without_ a // pending state, i.e. anyone using the context will expect to find buckets // in the DB that correspond to how the database looked like prior to pruning @@ -347,9 +355,9 @@ void BucketDBUpdater::update_read_snapshot_before_db_pruning() { void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { std::lock_guard lock(_distribution_context_mutex); - const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get( + const auto old_default_state = _op_ctx.bucket_space_repo().get( document::FixedBucketSpaces::default_space()).cluster_state_sp(); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { auto new_distribution = elem.second->distribution_sp(); auto old_cluster_state = elem.second->cluster_state_sp(); auto new_cluster_state = new_state.getDerivedClusterState(elem.first); @@ -360,7 +368,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt old_default_state, std::move(new_cluster_state), std::move(new_distribution), - _distributorComponent.getIndex())); + _node_ctx.node_index())); // We can now remove the explicit mutable DB snapshot, as the buckets that have been // pruned away are visible in the read-only DB. _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>(); @@ -370,7 +378,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { std::lock_guard lock(_distribution_context_mutex); const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space()); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { auto new_distribution = elem.second->distribution_sp(); auto new_cluster_state = activated_state.getDerivedClusterState(elem.first); _active_distribution_contexts.insert_or_assign( @@ -379,7 +387,7 @@ void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterSt std::move(new_cluster_state), default_cluster_state, std::move(new_distribution), - _distributorComponent.getIndex())); + _node_ctx.node_index())); } } @@ -391,7 +399,7 @@ BucketDBUpdater::onSetSystemState( "Received new cluster state %s", cmd->getSystemState().toString().c_str()); - const lib::ClusterStateBundle oldState = _distributorComponent.getClusterStateBundle(); + const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle(); const lib::ClusterStateBundle& state = cmd->getClusterStateBundle(); if (state == oldState) { @@ -399,33 +407,31 @@ BucketDBUpdater::onSetSystemState( } ensureTransitionTimerStarted(); // Separate timer since _transitionTimer might span multiple pending states. - framework::MilliSecTimer process_timer(_distributorComponent.getClock()); + framework::MilliSecTimer process_timer(_node_ctx.clock()); update_read_snapshot_before_db_pruning(); const auto& bundle = cmd->getClusterStateBundle(); removeSuperfluousBuckets(bundle, false); update_read_snapshot_after_db_pruning(bundle); replyToPreviousPendingClusterStateIfAny(); - ClusterInformation::CSP clusterInfo( - new SimpleClusterInformation( - _distributorComponent.getIndex(), - _distributorComponent.getClusterStateBundle(), - _distributorComponent.getDistributor() - .getStorageNodeUpStates())); + auto clusterInfo = std::make_shared<const SimpleClusterInformation>( + _node_ctx.node_index(), + _op_ctx.cluster_state_bundle(), + _op_ctx.storage_node_up_states()); _pendingClusterState = PendingClusterState::createForClusterStateChange( - _distributorComponent.getClock(), + _node_ctx.clock(), std::move(clusterInfo), _sender, - _distributorComponent.getBucketSpaceRepo(), + _op_ctx.bucket_space_repo(), cmd, _outdatedNodesMap, - _distributorComponent.getUniqueTimestamp()); + _op_ctx.generate_unique_timestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); - _distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue( + _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); - _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); + _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); if (isPendingClusterStateCompleted()) { processCompletedPendingClusterState(); } @@ -468,7 +474,7 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard() { if (_reply) { - _updater.getDistributorComponent().getDistributor().handleCompletedMerge(_reply); + _distributor_interface.handleCompletedMerge(_reply); } } @@ -476,8 +482,7 @@ bool BucketDBUpdater::onMergeBucketReply( const std::shared_ptr<api::MergeBucketReply>& reply) { - std::shared_ptr<MergeReplyGuard> replyGuard( - new MergeReplyGuard(*this, reply)); + auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply); // In case the merge was unsuccessful somehow, or some nodes weren't // actually merged (source-only nodes?) we request the bucket info of the @@ -523,8 +528,7 @@ BucketDBUpdater::onNotifyBucketChange( const std::shared_ptr<api::NotifyBucketChangeCommand>& cmd) { // Immediately schedule reply to ensure it is sent. - _sender.sendReply(std::shared_ptr<api::StorageReply>( - new api::NotifyBucketChangeReply(*cmd))); + _sender.sendReply(std::make_shared<api::NotifyBucketChangeReply>(*cmd)); if (!cmd->getBucketInfo().valid()) { LOG(error, @@ -594,7 +598,7 @@ BucketDBUpdater::handleSingleBucketInfoFailure( req.targetNode, repl->getResult().toString().c_str()); if (req.bucket.getBucketId() != document::BucketId(0)) { - framework::MilliSecTime sendTime(_distributorComponent.getClock()); + framework::MilliSecTime sendTime(_node_ctx.clock()); sendTime += framework::MilliSecTime(100); _delayedRequests.emplace_back(sendTime, req); } @@ -606,8 +610,10 @@ BucketDBUpdater::resendDelayedMessages() if (_pendingClusterState) { _pendingClusterState->resendDelayedMessages(); } - if (_delayedRequests.empty()) return; // Don't fetch time if not needed - framework::MilliSecTime currentTime(_distributorComponent.getClock()); + if (_delayedRequests.empty()) { + return; // Don't fetch time if not needed + } + framework::MilliSecTime currentTime(_node_ctx.clock()); while (!_delayedRequests.empty() && currentTime >= _delayedRequests.front().first) { @@ -662,7 +668,7 @@ BucketDBUpdater::processSingleBucketInfoReply( BucketRequest req = iter->second; _sentMessages.erase(iter); - if (!_distributorComponent.storageNodeIsUp(req.bucket.getBucketSpace(), req.targetNode)) { + if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) { // Ignore replies from nodes that are down. return true; } @@ -690,7 +696,7 @@ void BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, BucketListMerger::BucketList& existing) { - auto &distributorBucketSpace(_distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace())); + auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace())); std::vector<BucketDatabase::Entry> entries; distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries); @@ -704,12 +710,12 @@ BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node { for (const document::BucketId & bucketId : merger.getRemovedEntries()) { document::Bucket bucket(bucketSpace, bucketId); - _distributorComponent.removeNodeFromDB(bucket, node); + _op_ctx.remove_node_from_bucket_database(bucket, node); } for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) { document::Bucket bucket(bucketSpace, entry.first); - _distributorComponent.updateBucketDatabase( + _op_ctx.update_bucket_database( bucket, BucketCopy(merger.getTimestamp(), node, entry.second), DatabaseUpdate::CREATE_IF_NONEXISTING); @@ -737,7 +743,7 @@ BucketDBUpdater::processCompletedPendingClusterState() // taken effect via activation. External operation handler will keep operations from // actually being scheduled until state has been activated. The external operation handler // needs to be explicitly aware of the case where no state has yet to be activated. - _distributorComponent.getDistributor().getMessageSender().sendDown( + _distributor_interface.getMessageSender().sendDown( _pendingClusterState->getCommand()); _pendingClusterState->clearCommand(); return; @@ -750,7 +756,7 @@ BucketDBUpdater::processCompletedPendingClusterState() void BucketDBUpdater::activatePendingClusterState() { - framework::MilliSecTimer process_timer(_distributorComponent.getClock()); + framework::MilliSecTimer process_timer(_node_ctx.clock()); _pendingClusterState->mergeIntoBucketDatabases(); maybe_inject_simulated_db_merging_delay(); @@ -759,7 +765,7 @@ BucketDBUpdater::activatePendingClusterState() LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); enableCurrentClusterStateBundleInDistributor(); if (_pendingClusterState->hasCommand()) { - _distributorComponent.getDistributor().getMessageSender().sendDown( + _distributor_interface.getMessageSender().sendDown( _pendingClusterState->getCommand()); } addCurrentStateToClusterStateHistory(); @@ -767,18 +773,18 @@ BucketDBUpdater::activatePendingClusterState() LOG(debug, "Activating pending distribution config"); // TODO distribution changes cannot currently be deferred as they are not // initiated by the cluster controller! - _distributorComponent.getDistributor().notifyDistributionChangeEnabled(); + _distributor_interface.notifyDistributionChangeEnabled(); } update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); _pendingClusterState.reset(); _outdatedNodesMap.clear(); - _distributorComponent.getBucketSpaceRepo().clear_pending_cluster_state_bundle(); + _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); clearReadOnlyBucketRepoDatabases(); - _distributorComponent.getDistributor().getMetrics().activate_cluster_state_processing_time.addValue( + _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); } @@ -792,12 +798,12 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() "BucketDBUpdater finished processing state %s", state.getBaselineClusterState()->toString().c_str()); - _distributorComponent.getDistributor().enableClusterStateBundle(state); + _distributor_interface.enableClusterStateBundle(state); } void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) { update_read_snapshot_after_activation(activated_state); - _distributorComponent.getDistributor().enableClusterStateBundle(activated_state); + _distributor_interface.enableClusterStateBundle(activated_state); } void @@ -863,7 +869,7 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, using namespace vespalib::xml; xos << XmlTag("bucketdb") << XmlTag("systemstate_active") - << XmlContent(_distributorComponent.getClusterStateBundle().getBaselineClusterState()->toString()) + << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString()) << XmlEndTag(); if (_pendingClusterState) { xos << *_pendingClusterState; diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index d70a4fe3409..8fab76575e9 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -57,7 +57,8 @@ public: vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; void print(std::ostream& out, bool verbose, const std::string& indent) const; - DistributorComponent& getDistributorComponent() { return _distributorComponent; } + const DistributorNodeContext& node_context() const { return _node_ctx; } + DistributorOperationContext& operation_context() { return _op_ctx; } /** * Returns whether the current PendingClusterState indicates that there has @@ -78,11 +79,10 @@ public: OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const; private: - DistributorComponent _distributorComponent; class MergeReplyGuard { public: - MergeReplyGuard(BucketDBUpdater& updater, const std::shared_ptr<api::MergeBucketReply>& reply) - : _updater(updater), _reply(reply) {} + MergeReplyGuard(DistributorInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept + : _distributor_interface(distributor_interface), _reply(reply) {} ~MergeReplyGuard(); @@ -90,7 +90,7 @@ private: // than send it down void resetReply() { _reply.reset(); } private: - BucketDBUpdater& _updater; + DistributorInterface& _distributor_interface; std::shared_ptr<api::MergeBucketReply> _reply; }; @@ -239,6 +239,10 @@ private: mutable bool _cachedOwned; }; + DistributorComponent _distributorComponent; + const DistributorNodeContext& _node_ctx; + DistributorOperationContext& _op_ctx; + DistributorInterface& _distributor_interface; std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests; std::map<uint64_t, BucketRequest> _sentMessages; std::unique_ptr<PendingClusterState> _pendingClusterState; diff --git a/storage/src/vespa/storage/distributor/distributor_node_context.h b/storage/src/vespa/storage/distributor/distributor_node_context.h index 3cb0f509ea7..805e54342dc 100644 --- a/storage/src/vespa/storage/distributor/distributor_node_context.h +++ b/storage/src/vespa/storage/distributor/distributor_node_context.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/storage/common/cluster_context.h> +#include <vespa/storageapi/messageapi/storagemessage.h> #include <cstdint> namespace document { class BucketIdFactory; } @@ -20,6 +21,7 @@ public: virtual const framework::Clock& clock() const noexcept = 0; virtual const document::BucketIdFactory& bucket_id_factory() const noexcept = 0; virtual uint16_t node_index() const noexcept = 0; + virtual api::StorageMessageAddress node_address(uint16_t node_index) const noexcept = 0; }; } diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index 97a522a694a..0b47c71e2e1 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -50,6 +50,7 @@ public: uint32_t message_type) const = 0; virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0; virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0; + virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0; // TODO: Move to being a free function instead. virtual const char* storage_node_up_states() const = 0; diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index b4a0b00a2e1..ca953ed01ef 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -149,6 +149,7 @@ public: const vespalib::string * cluster_name_ptr() const noexcept override { return cluster_context().cluster_name_ptr(); } const document::BucketIdFactory& bucket_id_factory() const noexcept override { return getBucketIdFactory(); } uint16_t node_index() const noexcept override { return getIndex(); } + api::StorageMessageAddress node_address(uint16_t node_index) const noexcept override { return nodeAddress(node_index); } // Implements DistributorOperationContext api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); } @@ -203,6 +204,9 @@ public: const lib::ClusterStateBundle& cluster_state_bundle() const override { return getClusterStateBundle(); } + bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override { + return storageNodeIsUp(bucket_space, node_index); + } const char* storage_node_up_states() const override { return getDistributor().getStorageNodeUpStates(); } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 175c7d27033..a7fd5a5af53 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -32,7 +32,9 @@ PendingClusterState::PendingClusterState( const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) : _cmd(newStateCmd), + _sentMessages(), _requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), + _delayedRequests(), _prevClusterStateBundle(clusterInfo->getClusterStateBundle()), _newClusterStateBundle(newStateCmd->getClusterStateBundle()), _clock(clock), @@ -222,12 +224,11 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) _newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace)->toString().c_str(), distributionHash.c_str()); - std::shared_ptr<api::RequestBucketInfoCommand> cmd( - new api::RequestBucketInfoCommand( + auto cmd = std::make_shared<api::RequestBucketInfoCommand>( bucketSpaceAndNode.bucketSpace, _sender.getDistributorIndex(), *_newClusterStateBundle.getDerivedClusterState(bucketSpaceAndNode.bucketSpace), - distributionHash)); + distributionHash); cmd->setPriority(api::StorageMessage::HIGH); cmd->setTimeout(vespalib::duration::max()); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index 6d558d05640..42b7bf0dcf2 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -160,7 +160,7 @@ private: /** * Creates a pending cluster state that represents - * a set system state command from the fleet controller. + * a set system state command from the cluster controller. */ PendingClusterState( const framework::Clock&, |