diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-04-26 14:57:15 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-04-26 14:57:15 +0000 |
commit | ff386ea68d4e8432201fc6125281a188988c99cf (patch) | |
tree | 9e6074a7fa14a12e3292c4ebf417abd472eb5427 /storage | |
parent | ef6f81bb8db68b97972770fede264a97a5d5140d (diff) |
Remove processing of single bucket info replies.
These are per stripe and handled by StripeBucketDBUpdater instead.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/distributor/bucketdbupdater.cpp | 116 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/bucketdbupdater.h | 23 |
2 files changed, 1 insertions, 138 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 0631c4351dd..f328b599604 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -391,7 +391,7 @@ BucketDBUpdater::onRequestBucketInfoReply( if (pending_cluster_state_accepted(repl)) { return true; } - return process_single_bucket_info_reply(repl); + return false; } bool @@ -413,21 +413,6 @@ BucketDBUpdater::pending_cluster_state_accepted( } void -BucketDBUpdater::handle_single_bucket_info_failure( - const std::shared_ptr<api::RequestBucketInfoReply>& repl, - const BucketRequest& req) -{ - LOG(debug, "Request bucket info failed towards node %d: error was %s", - req.targetNode, repl->getResult().toString().c_str()); - - if (req.bucket.getBucketId() != document::BucketId(0)) { - framework::MilliSecTime sendTime(_node_ctx.clock()); - sendTime += framework::MilliSecTime(100); - _delayed_requests.emplace_back(sendTime, req); - } -} - -void BucketDBUpdater::resend_delayed_messages() { if (_pending_cluster_state) { @@ -446,105 +431,6 @@ BucketDBUpdater::resend_delayed_messages() } } -void -BucketDBUpdater::convert_bucket_info_to_bucket_list( - const std::shared_ptr<api::RequestBucketInfoReply>& repl, - uint16_t targetNode, BucketListMerger::BucketList& newList) -{ - for (const auto & entry : repl->getBucketInfo()) { - LOG(debug, "Received bucket information from node %u for bucket %s: %s", targetNode, - entry._bucketId.toString().c_str(), entry._info.toString().c_str()); - - newList.emplace_back(entry._bucketId, entry._info); - } -} - -void -BucketDBUpdater::merge_bucket_info_with_database( - const std::shared_ptr<api::RequestBucketInfoReply>& repl, - const BucketRequest& req) -{ - BucketListMerger::BucketList existing; - BucketListMerger::BucketList newList; - - find_related_buckets_in_database(req.targetNode, req.bucket, existing); - convert_bucket_info_to_bucket_list(repl, req.targetNode, newList); - - std::sort(existing.begin(), existing.end(), sort_pred); - std::sort(newList.begin(), newList.end(), sort_pred); - - BucketListMerger merger(newList, existing, req.timestamp); - update_database(req.bucket.getBucketSpace(), req.targetNode, merger); -} - -bool -BucketDBUpdater::process_single_bucket_info_reply( - const std::shared_ptr<api::RequestBucketInfoReply> & repl) -{ - auto iter = _sent_messages.find(repl->getMsgId()); - - // Has probably been deleted for some reason earlier. - if (iter == _sent_messages.end()) { - return true; - } - - BucketRequest req = iter->second; - _sent_messages.erase(iter); - - if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) { - // Ignore replies from nodes that are down. - return true; - } - if (repl->getResult().getResult() != api::ReturnCode::OK) { - handle_single_bucket_info_failure(repl, req); - return true; - } - merge_bucket_info_with_database(repl, req); - return true; -} - -void -BucketDBUpdater::add_bucket_info_for_node( - const BucketDatabase::Entry& e, - uint16_t node, - BucketListMerger::BucketList& existing) const -{ - const BucketCopy* copy(e->getNode(node)); - if (copy) { - existing.emplace_back(e.getBucketId(), copy->getBucketInfo()); - } -} - -void -BucketDBUpdater::find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket, - BucketListMerger::BucketList& existing) -{ - auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace())); - std::vector<BucketDatabase::Entry> entries; - distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries); - - for (const BucketDatabase::Entry & entry : entries) { - add_bucket_info_for_node(entry, node, existing); - } -} - -void -BucketDBUpdater::update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger) -{ - for (const document::BucketId & bucketId : merger.getRemovedEntries()) { - document::Bucket bucket(bucketSpace, bucketId); - _op_ctx.remove_node_from_bucket_database(bucket, node); - } - - for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) { - document::Bucket bucket(bucketSpace, entry.first); - _op_ctx.update_bucket_database( - bucket, - BucketCopy(merger.getTimestamp(), node, entry.second), - DatabaseUpdate::CREATE_IF_NONEXISTING); - } -} - bool BucketDBUpdater::is_pending_cluster_state_completed() const { diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index a916aa8aa8a..b990f094e9c 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -136,36 +136,13 @@ private: bool should_defer_state_enabling() const noexcept; bool has_pending_cluster_state() const; bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); - bool process_single_bucket_info_reply(const std::shared_ptr<api::RequestBucketInfoReply>& repl); - void handle_single_bucket_info_failure(const std::shared_ptr<api::RequestBucketInfoReply>& repl, - const BucketRequest& req); bool is_pending_cluster_state_completed() const; void process_completed_pending_cluster_state(StripeAccessGuard& guard); void activate_pending_cluster_state(StripeAccessGuard& guard); - void merge_bucket_info_with_database(const std::shared_ptr<api::RequestBucketInfoReply>& repl, - const BucketRequest& req); - void convert_bucket_info_to_bucket_list(const std::shared_ptr<api::RequestBucketInfoReply>& repl, - uint16_t targetNode, BucketListMerger::BucketList& newList); void send_request_bucket_info(uint16_t node, const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard); - void add_bucket_info_for_node(const BucketDatabase::Entry& e, uint16_t node, - BucketListMerger::BucketList& existing) const; void ensure_transition_timer_started(); void complete_transition_timer(); - /** - * Adds all buckets contained in the bucket database - * that are either contained - * in bucketId, or that bucketId is contained in, that have copies - * on the given node. - */ - void find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket, - BucketListMerger::BucketList& existing); - - /** - Updates the bucket database from the information generated by the given - bucket list merger. - */ - void update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger); void remove_superfluous_buckets(StripeAccessGuard& guard, const lib::ClusterStateBundle& new_state, |