summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-04-26 14:57:15 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-04-26 14:57:15 +0000
commitff386ea68d4e8432201fc6125281a188988c99cf (patch)
tree9e6074a7fa14a12e3292c4ebf417abd472eb5427 /storage
parentef6f81bb8db68b97972770fede264a97a5d5140d (diff)
Remove processing of single bucket info replies.
These are per stripe and handled by StripeBucketDBUpdater instead.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp116
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h23
2 files changed, 1 insertions, 138 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 0631c4351dd..f328b599604 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -391,7 +391,7 @@ BucketDBUpdater::onRequestBucketInfoReply(
if (pending_cluster_state_accepted(repl)) {
return true;
}
- return process_single_bucket_info_reply(repl);
+ return false;
}
bool
@@ -413,21 +413,6 @@ BucketDBUpdater::pending_cluster_state_accepted(
}
void
-BucketDBUpdater::handle_single_bucket_info_failure(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req)
-{
- LOG(debug, "Request bucket info failed towards node %d: error was %s",
- req.targetNode, repl->getResult().toString().c_str());
-
- if (req.bucket.getBucketId() != document::BucketId(0)) {
- framework::MilliSecTime sendTime(_node_ctx.clock());
- sendTime += framework::MilliSecTime(100);
- _delayed_requests.emplace_back(sendTime, req);
- }
-}
-
-void
BucketDBUpdater::resend_delayed_messages()
{
if (_pending_cluster_state) {
@@ -446,105 +431,6 @@ BucketDBUpdater::resend_delayed_messages()
}
}
-void
-BucketDBUpdater::convert_bucket_info_to_bucket_list(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- uint16_t targetNode, BucketListMerger::BucketList& newList)
-{
- for (const auto & entry : repl->getBucketInfo()) {
- LOG(debug, "Received bucket information from node %u for bucket %s: %s", targetNode,
- entry._bucketId.toString().c_str(), entry._info.toString().c_str());
-
- newList.emplace_back(entry._bucketId, entry._info);
- }
-}
-
-void
-BucketDBUpdater::merge_bucket_info_with_database(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req)
-{
- BucketListMerger::BucketList existing;
- BucketListMerger::BucketList newList;
-
- find_related_buckets_in_database(req.targetNode, req.bucket, existing);
- convert_bucket_info_to_bucket_list(repl, req.targetNode, newList);
-
- std::sort(existing.begin(), existing.end(), sort_pred);
- std::sort(newList.begin(), newList.end(), sort_pred);
-
- BucketListMerger merger(newList, existing, req.timestamp);
- update_database(req.bucket.getBucketSpace(), req.targetNode, merger);
-}
-
-bool
-BucketDBUpdater::process_single_bucket_info_reply(
- const std::shared_ptr<api::RequestBucketInfoReply> & repl)
-{
- auto iter = _sent_messages.find(repl->getMsgId());
-
- // Has probably been deleted for some reason earlier.
- if (iter == _sent_messages.end()) {
- return true;
- }
-
- BucketRequest req = iter->second;
- _sent_messages.erase(iter);
-
- if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
- // Ignore replies from nodes that are down.
- return true;
- }
- if (repl->getResult().getResult() != api::ReturnCode::OK) {
- handle_single_bucket_info_failure(repl, req);
- return true;
- }
- merge_bucket_info_with_database(repl, req);
- return true;
-}
-
-void
-BucketDBUpdater::add_bucket_info_for_node(
- const BucketDatabase::Entry& e,
- uint16_t node,
- BucketListMerger::BucketList& existing) const
-{
- const BucketCopy* copy(e->getNode(node));
- if (copy) {
- existing.emplace_back(e.getBucketId(), copy->getBucketInfo());
- }
-}
-
-void
-BucketDBUpdater::find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket,
- BucketListMerger::BucketList& existing)
-{
- auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
- std::vector<BucketDatabase::Entry> entries;
- distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
-
- for (const BucketDatabase::Entry & entry : entries) {
- add_bucket_info_for_node(entry, node, existing);
- }
-}
-
-void
-BucketDBUpdater::update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger)
-{
- for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
- document::Bucket bucket(bucketSpace, bucketId);
- _op_ctx.remove_node_from_bucket_database(bucket, node);
- }
-
- for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) {
- document::Bucket bucket(bucketSpace, entry.first);
- _op_ctx.update_bucket_database(
- bucket,
- BucketCopy(merger.getTimestamp(), node, entry.second),
- DatabaseUpdate::CREATE_IF_NONEXISTING);
- }
-}
-
bool
BucketDBUpdater::is_pending_cluster_state_completed() const
{
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index a916aa8aa8a..b990f094e9c 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -136,36 +136,13 @@ private:
bool should_defer_state_enabling() const noexcept;
bool has_pending_cluster_state() const;
bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- bool process_single_bucket_info_reply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- void handle_single_bucket_info_failure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
bool is_pending_cluster_state_completed() const;
void process_completed_pending_cluster_state(StripeAccessGuard& guard);
void activate_pending_cluster_state(StripeAccessGuard& guard);
- void merge_bucket_info_with_database(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
- void convert_bucket_info_to_bucket_list(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- uint16_t targetNode, BucketListMerger::BucketList& newList);
void send_request_bucket_info(uint16_t node, const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard);
- void add_bucket_info_for_node(const BucketDatabase::Entry& e, uint16_t node,
- BucketListMerger::BucketList& existing) const;
void ensure_transition_timer_started();
void complete_transition_timer();
- /**
- * Adds all buckets contained in the bucket database
- * that are either contained
- * in bucketId, or that bucketId is contained in, that have copies
- * on the given node.
- */
- void find_related_buckets_in_database(uint16_t node, const document::Bucket& bucket,
- BucketListMerger::BucketList& existing);
-
- /**
- Updates the bucket database from the information generated by the given
- bucket list merger.
- */
- void update_database(document::BucketSpace bucketSpace, uint16_t node, BucketListMerger& merger);
void remove_superfluous_buckets(StripeAccessGuard& guard,
const lib::ClusterStateBundle& new_state,