diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-11-30 15:58:21 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-11-30 16:08:12 +0000 |
commit | 77d2ebfdd7a8d38161ce0dcd77cc431ea1899ec1 (patch) | |
tree | 6d3c22d27275edc3c0d7a1da939e493c8e3e13a5 /storage | |
parent | c105ea3963f924c3bd501e7f65cdaae3eb9edf56 (diff) |
Don't let ignored bucket info reply be propagated out of distributor
If a reply arrives for a preempted cluster state it will be ignored.
To avoid it being automatically sent further down the storage chain
we still have to treat it as handled. Otherwise a scary looking but
otherwise benign "unhandled message" warning will be emitted in the
Vespa log.
Also move an existing test to the correct test fixture.
Diffstat (limited to 'storage')
3 files changed, 61 insertions, 44 deletions
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index 3ed5e9f4a8d..1632867b627 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -2548,6 +2548,55 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_onl } } +TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) { + lib::ClusterState state("distributor:1 storage:3"); + set_cluster_state(state); + uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1; + + // Known feature sets are initially empty. + auto stripes = distributor_stripes(); + for (auto* s : stripes) { + for (uint16_t i : {0, 1, 2}) { + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining); + } + } + + ASSERT_EQ(expected_msgs, _sender.commands().size()); + for (uint32_t i = 0; i < _sender.commands().size(); i++) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), + dummy_buckets_to_return, [i](auto& reply) noexcept { + // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported. + // Node 0 does not support the fanciness. + if (i > 0) { + reply.supported_node_features().unordered_merge_chaining = true; + } + })); + } + + // Node features should be propagated to all stripes + for (auto* s : stripes) { + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining); + } +} + +TEST_F(TopLevelBucketDBUpdaterTest, outdated_bucket_info_reply_is_ignored) { + set_cluster_state("version:1 distributor:1 storage:1"); + ASSERT_EQ(message_count(1), _sender.commands().size()); + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands().front()); + _sender.clear(); + // Force a new pending cluster state which overwrites the pending one. + lib::ClusterState new_state("version:2 distributor:1 storage:2"); + set_cluster_state(new_state); + + const api::StorageMessageAddress& address(*req->getAddress()); + bool handled = bucket_db_updater().onRequestBucketInfoReply( + make_fake_bucket_reply(new_state, *req, address.getIndex(), 0, 0)); + EXPECT_TRUE(handled); // Should be returned as handled even though it's technically ignored. +} + + struct BucketDBUpdaterSnapshotTest : TopLevelBucketDBUpdaterTest { lib::ClusterState empty_state; std::shared_ptr<lib::ClusterState> initial_baseline; @@ -2678,37 +2727,4 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl EXPECT_FALSE(def_rs.is_routable()); } -TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) { - lib::ClusterState state("distributor:1 storage:3"); - set_cluster_state(state); - uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1; - - // Known feature sets are initially empty. - auto stripes = distributor_stripes(); - for (auto* s : stripes) { - for (uint16_t i : {0, 1, 2}) { - EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining); - } - } - - ASSERT_EQ(expected_msgs, _sender.commands().size()); - for (uint32_t i = 0; i < _sender.commands().size(); i++) { - ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), - dummy_buckets_to_return, [i](auto& reply) noexcept { - // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported. - // Node 0 does not support the fanciness. - if (i > 0) { - reply.supported_node_features().unordered_merge_chaining = true; - } - })); - } - - // Node features should be propagated to all stripes - for (auto* s : stripes) { - EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining); - EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining); - EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining); - } -} - } diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 613f0f6ce09..16be7733c1a 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -311,14 +311,12 @@ bool TopLevelBucketDBUpdater::onRequestBucketInfoReply( const std::shared_ptr<api::RequestBucketInfoReply>& repl) { - if (pending_cluster_state_accepted(repl)) { - return true; - } - return false; + attempt_accept_reply_by_current_pending_state(repl); + return true; } -bool -TopLevelBucketDBUpdater::pending_cluster_state_accepted( +void +TopLevelBucketDBUpdater::attempt_accept_reply_by_current_pending_state( const std::shared_ptr<api::RequestBucketInfoReply>& repl) { if (_pending_cluster_state.get() @@ -328,11 +326,14 @@ TopLevelBucketDBUpdater::pending_cluster_state_accepted( auto guard = _stripe_accessor.rendezvous_and_hold_all(); process_completed_pending_cluster_state(*guard); } - return true; + } else { + // Reply is not recognized, so its corresponding command must have been + // sent by a previous, preempted cluster state. We must still swallow the + // reply to prevent it from being passed further down a storage chain that + // does not expect it. + LOG(spam, "Reply %s was not accepted by pending cluster state", + repl->toString().c_str()); } - LOG(spam, "Reply %s was not accepted by pending cluster state", - repl->toString().c_str()); - return false; } void diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h index b1065e708a4..d8e49d5c383 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h @@ -85,7 +85,7 @@ private: bool should_defer_state_enabling() const noexcept; bool has_pending_cluster_state() const; - bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); + void attempt_accept_reply_by_current_pending_state(const std::shared_ptr<api::RequestBucketInfoReply>& repl); bool is_pending_cluster_state_completed() const; void process_completed_pending_cluster_state(StripeAccessGuard& guard); void activate_pending_cluster_state(StripeAccessGuard& guard); |