aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-11-30 15:58:21 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-11-30 16:08:12 +0000
commit77d2ebfdd7a8d38161ce0dcd77cc431ea1899ec1 (patch)
tree6d3c22d27275edc3c0d7a1da939e493c8e3e13a5 /storage
parentc105ea3963f924c3bd501e7f65cdaae3eb9edf56 (diff)
Don't let ignored bucket info reply be propagated out of distributor
If a reply arrives for a preempted cluster state it will be ignored. To avoid it being automatically sent further down the storage chain we still have to treat it as handled. Otherwise a scary looking but otherwise benign "unhandled message" warning will be emitted in the Vespa log. Also move an existing test to the correct test fixture.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp82
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h2
3 files changed, 61 insertions, 44 deletions
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index 3ed5e9f4a8d..1632867b627 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -2548,6 +2548,55 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_onl
}
}
+TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
+ lib::ClusterState state("distributor:1 storage:3");
+ set_cluster_state(state);
+ uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
+
+ // Known feature sets are initially empty.
+ auto stripes = distributor_stripes();
+ for (auto* s : stripes) {
+ for (uint16_t i : {0, 1, 2}) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
+ }
+ }
+
+ ASSERT_EQ(expected_msgs, _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); i++) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
+ dummy_buckets_to_return, [i](auto& reply) noexcept {
+ // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
+ // Node 0 does not support the fanciness.
+ if (i > 0) {
+ reply.supported_node_features().unordered_merge_chaining = true;
+ }
+ }));
+ }
+
+ // Node features should be propagated to all stripes
+ for (auto* s : stripes) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
+ }
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, outdated_bucket_info_reply_is_ignored) {
+ set_cluster_state("version:1 distributor:1 storage:1");
+ ASSERT_EQ(message_count(1), _sender.commands().size());
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands().front());
+ _sender.clear();
+ // Force a new pending cluster state which overwrites the pending one.
+ lib::ClusterState new_state("version:2 distributor:1 storage:2");
+ set_cluster_state(new_state);
+
+ const api::StorageMessageAddress& address(*req->getAddress());
+ bool handled = bucket_db_updater().onRequestBucketInfoReply(
+ make_fake_bucket_reply(new_state, *req, address.getIndex(), 0, 0));
+ EXPECT_TRUE(handled); // Should be returned as handled even though it's technically ignored.
+}
+
+
struct BucketDBUpdaterSnapshotTest : TopLevelBucketDBUpdaterTest {
lib::ClusterState empty_state;
std::shared_ptr<lib::ClusterState> initial_baseline;
@@ -2678,37 +2727,4 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl
EXPECT_FALSE(def_rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
- lib::ClusterState state("distributor:1 storage:3");
- set_cluster_state(state);
- uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
-
- // Known feature sets are initially empty.
- auto stripes = distributor_stripes();
- for (auto* s : stripes) {
- for (uint16_t i : {0, 1, 2}) {
- EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
- }
- }
-
- ASSERT_EQ(expected_msgs, _sender.commands().size());
- for (uint32_t i = 0; i < _sender.commands().size(); i++) {
- ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
- dummy_buckets_to_return, [i](auto& reply) noexcept {
- // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
- // Node 0 does not support the fanciness.
- if (i > 0) {
- reply.supported_node_features().unordered_merge_chaining = true;
- }
- }));
- }
-
- // Node features should be propagated to all stripes
- for (auto* s : stripes) {
- EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
- EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
- EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
- }
-}
-
}
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 613f0f6ce09..16be7733c1a 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -311,14 +311,12 @@ bool
TopLevelBucketDBUpdater::onRequestBucketInfoReply(
const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
- if (pending_cluster_state_accepted(repl)) {
- return true;
- }
- return false;
+ attempt_accept_reply_by_current_pending_state(repl);
+ return true;
}
-bool
-TopLevelBucketDBUpdater::pending_cluster_state_accepted(
+void
+TopLevelBucketDBUpdater::attempt_accept_reply_by_current_pending_state(
const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
if (_pending_cluster_state.get()
@@ -328,11 +326,14 @@ TopLevelBucketDBUpdater::pending_cluster_state_accepted(
auto guard = _stripe_accessor.rendezvous_and_hold_all();
process_completed_pending_cluster_state(*guard);
}
- return true;
+ } else {
+ // Reply is not recognized, so its corresponding command must have been
+ // sent by a previous, preempted cluster state. We must still swallow the
+ // reply to prevent it from being passed further down a storage chain that
+ // does not expect it.
+ LOG(spam, "Reply %s was not accepted by pending cluster state",
+ repl->toString().c_str());
}
- LOG(spam, "Reply %s was not accepted by pending cluster state",
- repl->toString().c_str());
- return false;
}
void
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index b1065e708a4..d8e49d5c383 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -85,7 +85,7 @@ private:
bool should_defer_state_enabling() const noexcept;
bool has_pending_cluster_state() const;
- bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ void attempt_accept_reply_by_current_pending_state(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
bool is_pending_cluster_state_completed() const;
void process_completed_pending_cluster_state(StripeAccessGuard& guard);
void activate_pending_cluster_state(StripeAccessGuard& guard);