diff options
Diffstat (limited to 'storage')
4 files changed, 39 insertions, 34 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index e6f44c92dbe..710471bc167 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -32,6 +32,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), _node_ctx(_distributorComponent), + _op_ctx(_distributorComponent), _delayedRequests(), _sentMessages(), _pendingClusterState(), @@ -45,7 +46,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, _explicit_transition_read_guard(), _distribution_context_mutex() { - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { _active_distribution_contexts.emplace( elem.first, BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index())); @@ -69,8 +70,8 @@ OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const documen return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second); } const auto& space_repo = bucket_present_in_mutable_db - ? _distributorComponent.getBucketSpaceRepo() - : _distributorComponent.getReadOnlyBucketSpaceRepo(); + ? _op_ctx.bucket_space_repo() + : _op_ctx.read_only_bucket_space_repo(); auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space); assert(existing_guard_iter != _explicit_transition_read_guard.cend()); auto db_guard = existing_guard_iter->second @@ -124,7 +125,7 @@ BucketDBUpdater::sendRequestBucketInfo( const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) { - if (!_distributorComponent.storageNodeIsUp(bucket.getBucketSpace(), node)) { + if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) { return; } @@ -144,7 +145,7 @@ BucketDBUpdater::sendRequestBucketInfo( msg->setAddress(_node_ctx.node_address(node)); _sentMessages[msg->getMsgId()] = - BucketRequest(node, _distributorComponent.getUniqueTimestamp(), + BucketRequest(node, _op_ctx.generate_unique_timestamp(), bucket, mergeReplyGuard); _sender.sendCommand(msg); } @@ -209,8 +210,8 @@ BucketDBUpdater::removeSuperfluousBuckets( bool is_distribution_config_change) { const bool move_to_read_only_db = shouldDeferStateEnabling(); - const char* up_states = _distributorComponent.getDistributor().getStorageNodeUpStates(); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + const char* up_states = _op_ctx.storage_node_up_states(); + for (auto& elem : _op_ctx.bucket_space_repo()) { const auto& newDistribution(elem.second->getDistribution()); const auto& oldClusterState(elem.second->getClusterState()); const auto& new_cluster_state = newState.getDerivedClusterState(elem.first); @@ -227,7 +228,7 @@ BucketDBUpdater::removeSuperfluousBuckets( } auto& bucketDb(elem.second->getBucketDatabase()); - auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase()); + auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase()); // Remove all buckets not belonging to this distributor, or // being on storage nodes that are no longer up. @@ -260,12 +261,12 @@ void maybe_sleep_for(std::chrono::milliseconds ms) { void BucketDBUpdater::maybe_inject_simulated_db_pruning_delay() { - maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_pruning_latency()); + maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency()); } void BucketDBUpdater::maybe_inject_simulated_db_merging_delay() { - maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_merging_latency()); + maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency()); } void @@ -289,7 +290,7 @@ BucketDBUpdater::completeTransitionTimer() void BucketDBUpdater::clearReadOnlyBucketRepoDatabases() { - for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) { + for (auto& space : _op_ctx.read_only_bucket_space_repo()) { space.second->getBucketDatabase().clear(); } } @@ -299,20 +300,20 @@ BucketDBUpdater::storageDistributionChanged() { ensureTransitionTimerStarted(); - removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle(), true); + removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true); auto clusterInfo = std::make_shared<const SimpleClusterInformation>( _node_ctx.node_index(), - _distributorComponent.getClusterStateBundle(), - _distributorComponent.getDistributor().getStorageNodeUpStates()); + _op_ctx.cluster_state_bundle(), + _op_ctx.storage_node_up_states()); _pendingClusterState = PendingClusterState::createForDistributionChange( _node_ctx.clock(), std::move(clusterInfo), _sender, - _distributorComponent.getBucketSpaceRepo(), - _distributorComponent.getUniqueTimestamp()); + _op_ctx.bucket_space_repo(), + _op_ctx.generate_unique_timestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); - _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); + _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); } void @@ -336,7 +337,7 @@ BucketDBUpdater::replyToActivationWithActualVersion( void BucketDBUpdater::update_read_snapshot_before_db_pruning() { std::lock_guard lock(_distribution_context_mutex); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { // At this point, we're still operating with a distribution context _without_ a // pending state, i.e. anyone using the context will expect to find buckets // in the DB that correspond to how the database looked like prior to pruning @@ -353,9 +354,9 @@ void BucketDBUpdater::update_read_snapshot_before_db_pruning() { void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { std::lock_guard lock(_distribution_context_mutex); - const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get( + const auto old_default_state = _op_ctx.bucket_space_repo().get( document::FixedBucketSpaces::default_space()).cluster_state_sp(); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { auto new_distribution = elem.second->distribution_sp(); auto old_cluster_state = elem.second->cluster_state_sp(); auto new_cluster_state = new_state.getDerivedClusterState(elem.first); @@ -376,7 +377,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { std::lock_guard lock(_distribution_context_mutex); const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space()); - for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { + for (auto& elem : _op_ctx.bucket_space_repo()) { auto new_distribution = elem.second->distribution_sp(); auto new_cluster_state = activated_state.getDerivedClusterState(elem.first); _active_distribution_contexts.insert_or_assign( @@ -397,7 +398,7 @@ BucketDBUpdater::onSetSystemState( "Received new cluster state %s", cmd->getSystemState().toString().c_str()); - const lib::ClusterStateBundle oldState = _distributorComponent.getClusterStateBundle(); + const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle(); const lib::ClusterStateBundle& state = cmd->getClusterStateBundle(); if (state == oldState) { @@ -414,23 +415,22 @@ BucketDBUpdater::onSetSystemState( auto clusterInfo = std::make_shared<const SimpleClusterInformation>( _node_ctx.node_index(), - _distributorComponent.getClusterStateBundle(), - _distributorComponent.getDistributor() - .getStorageNodeUpStates()); + _op_ctx.cluster_state_bundle(), + _op_ctx.storage_node_up_states()); _pendingClusterState = PendingClusterState::createForClusterStateChange( _node_ctx.clock(), std::move(clusterInfo), _sender, - _distributorComponent.getBucketSpaceRepo(), + _op_ctx.bucket_space_repo(), cmd, _outdatedNodesMap, - _distributorComponent.getUniqueTimestamp()); + _op_ctx.generate_unique_timestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); _distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); - _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); + _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); if (isPendingClusterStateCompleted()) { processCompletedPendingClusterState(); } @@ -667,7 +667,7 @@ BucketDBUpdater::processSingleBucketInfoReply( BucketRequest req = iter->second; _sentMessages.erase(iter); - if (!_distributorComponent.storageNodeIsUp(req.bucket.getBucketSpace(), req.targetNode)) { + if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) { // Ignore replies from nodes that are down. return true; } @@ -695,7 +695,7 @@ void BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket, BucketListMerger::BucketList& existing) { - auto &distributorBucketSpace(_distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace())); + auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace())); std::vector<BucketDatabase::Entry> entries; distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries); @@ -709,12 +709,12 @@ BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node { for (const document::BucketId & bucketId : merger.getRemovedEntries()) { document::Bucket bucket(bucketSpace, bucketId); - _distributorComponent.removeNodeFromDB(bucket, node); + _op_ctx.remove_node_from_bucket_database(bucket, node); } for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) { document::Bucket bucket(bucketSpace, entry.first); - _distributorComponent.updateBucketDatabase( + _op_ctx.update_bucket_database( bucket, BucketCopy(merger.getTimestamp(), node, entry.second), DatabaseUpdate::CREATE_IF_NONEXISTING); @@ -778,7 +778,7 @@ BucketDBUpdater::activatePendingClusterState() update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); _pendingClusterState.reset(); _outdatedNodesMap.clear(); - _distributorComponent.getBucketSpaceRepo().clear_pending_cluster_state_bundle(); + _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); clearReadOnlyBucketRepoDatabases(); @@ -868,7 +868,7 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, using namespace vespalib::xml; xos << XmlTag("bucketdb") << XmlTag("systemstate_active") - << XmlContent(_distributorComponent.getClusterStateBundle().getBaselineClusterState()->toString()) + << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString()) << XmlEndTag(); if (_pendingClusterState) { xos << *_pendingClusterState; diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 5503e6bf22e..d706d162050 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -240,6 +240,7 @@ private: DistributorComponent _distributorComponent; const DistributorNodeContext& _node_ctx; + DistributorOperationContext& _op_ctx; std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests; std::map<uint64_t, BucketRequest> _sentMessages; std::unique_ptr<PendingClusterState> _pendingClusterState; diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index 97a522a694a..0b47c71e2e1 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -50,6 +50,7 @@ public: uint32_t message_type) const = 0; virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0; virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0; + virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0; // TODO: Move to being a free function instead. virtual const char* storage_node_up_states() const = 0; diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index c2ab258be35..ca953ed01ef 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -204,6 +204,9 @@ public: const lib::ClusterStateBundle& cluster_state_bundle() const override { return getClusterStateBundle(); } + bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override { + return storageNodeIsUp(bucket_space, node_index); + } const char* storage_node_up_states() const override { return getDistributor().getStorageNodeUpStates(); } |