aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-09-08 15:53:17 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-09-08 15:53:17 +0000
commitc99b6ca8bdb1e99d123e47f25ed25da28ae1e2ce (patch)
treebf1aadea261e9302d982fdbd5443c5a0ea7f147d /storage/src
parent490a718a6036b05dca9935ea177c1af514ed10dc (diff)
Port additional DB updater tests and fix delayed sending regression
Addresses a missing piece of functionality in the new code path where queued bucket rechecks during a pending cluster state time window would not be sent as expected when the pending state has been completed and activated.
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp3
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp161
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h1
5 files changed, 173 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
index cb61c1bb009..6cca6df9f80 100644
--- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
@@ -1053,6 +1053,7 @@ TEST_F(LegacyBucketDBUpdaterTest, recheck_node) {
EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change) {
enableDistributorClusterState("distributor:1 storage:1");
@@ -1116,6 +1117,7 @@ TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change) {
dumpBucket(document::BucketId(16, 2)));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change_from_node_down) {
enableDistributorClusterState("distributor:1 storage:2");
@@ -1162,6 +1164,7 @@ TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change_from_node_down) {
dumpBucket(document::BucketId(16, 1)));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/**
* Test that NotifyBucketChange received while there's a pending cluster state
* waits until the cluster state has been enabled as current before it sends off
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index de0b6b22358..70e5afaed43 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -284,6 +284,12 @@ public:
ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, state));
}
+ void set_and_enable_cluster_state(const lib::ClusterState& state, uint32_t expected_msgs, uint32_t n_buckets) {
+ _sender.clear();
+ set_cluster_state(state);
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets));
+ }
+
};
TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest()
@@ -786,4 +792,159 @@ TEST_F(TopLevelBucketDBUpdaterTest, recheck_node) {
EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo());
}
+TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) {
+ enable_distributor_cluster_state("distributor:1 storage:1");
+
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1234");
+ _sender.replies().clear();
+
+ {
+ api::BucketInfo info(1, 2, 3, 4, 5, true, true);
+ auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(
+ makeDocumentBucket(document::BucketId(16, 1)), info);
+ cmd->setSourceIndex(0);
+ stripe_of_bucket(document::BucketId(16, 1)).bucket_db_updater().onNotifyBucketChange(cmd);
+ }
+
+ {
+ api::BucketInfo info(10, 11, 12, 13, 14, false, false);
+ auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(
+ makeDocumentBucket(document::BucketId(16, 2)), info);
+ cmd->setSourceIndex(0);
+ stripe_of_bucket(document::BucketId(16, 2)).bucket_db_updater().onNotifyBucketChange(cmd);
+ }
+
+ // Must receive reply
+ ASSERT_EQ(size_t(2), _sender.replies().size());
+
+ for (int i = 0; i < 2; ++i) {
+ ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, _sender.reply(i)->getType());
+ }
+
+ // No database update until request bucket info replies have been received.
+ EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234,"
+ "trusted=false,active=false,ready=false)"),
+ dump_bucket(document::BucketId(16, 1)));
+ EXPECT_EQ(std::string("NONEXISTING"), dump_bucket(document::BucketId(16, 2)));
+
+ ASSERT_EQ(size_t(2), _sender.commands().size());
+
+ std::vector<api::BucketInfo> infos;
+ infos.push_back(api::BucketInfo(4567, 200, 2000, 400, 4000, true, true));
+ infos.push_back(api::BucketInfo(8999, 300, 3000, 500, 5000, false, false));
+
+ for (int i = 0; i < 2; ++i) {
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(i));
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ document::BucketId bucket_id(16, i + 1);
+ EXPECT_EQ(bucket_id, rbi.getBuckets()[0]);
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
+ reply->getBucketInfo().push_back(api::RequestBucketInfoReply::Entry(bucket_id, infos[i]));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply);
+ }
+
+ EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"),
+ dump_bucket(document::BucketId(16, 1)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"),
+ dump_bucket(document::BucketId(16, 2)));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change_from_node_down) {
+ enable_distributor_cluster_state("distributor:1 storage:2");
+
+ document::BucketId bucket_id(16, 1);
+ add_nodes_to_stripe_bucket_db(bucket_id, "1=1234");
+
+ _sender.replies().clear();
+
+ {
+ api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false);
+ auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(makeDocumentBucket(bucket_id), info);
+ cmd->setSourceIndex(0);
+ stripe_of_bucket(bucket_id).bucket_db_updater().onNotifyBucketChange(cmd);
+ }
+ // Enable here to avoid having request bucket info be silently swallowed
+ // (send_request_bucket_info drops message if node is down).
+ enable_distributor_cluster_state("distributor:1 storage:2 .0.s:d");
+
+ ASSERT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+
+ ASSERT_EQ(size_t(1), _sender.replies().size());
+ ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, _sender.reply(0)->getType());
+
+ // Currently, this pending operation will be auto-flushed when the cluster state
+ // changes so the behavior is still correct. Keep this test around to prevent
+ // regressions here.
+ ASSERT_EQ(size_t(1), _sender.commands().size());
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0));
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(bucket_id, rbi.getBuckets()[0]);
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
+ reply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(
+ bucket_id,
+ api::BucketInfo(8999, 300, 3000, 500, 5000, false, false)));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply);
+
+ // No change
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+}
+
+/**
+ * Test that NotifyBucketChange received while there's a pending cluster state
+ * waits until the cluster state has been enabled as current before it sends off
+ * the single bucket info requests. This is to prevent a race condition where
+ * the replies to bucket info requests for buckets that would be owned by the
+ * distributor in the pending state but not by the current state would be
+ * discarded when attempted inserted into the bucket database.
+ */
+TEST_F(TopLevelBucketDBUpdaterTest, notify_change_with_pending_state_queues_bucket_info_requests) {
+ set_cluster_state("distributor:1 storage:1");
+ ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size());
+
+ document::BucketId bucket_id(16, 1);
+ {
+ api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false);
+ auto cmd(std::make_shared<api::NotifyBucketChangeCommand>(
+ makeDocumentBucket(bucket_id), info));
+ cmd->setSourceIndex(0);
+ stripe_of_bucket(bucket_id).bucket_db_updater().onNotifyBucketChange(cmd);
+ }
+
+ ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size());
+
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 storage:1"),
+ _bucket_spaces.size(), 10));
+
+ ASSERT_EQ(_bucket_spaces.size() + 1, _sender.commands().size());
+
+ {
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(_bucket_spaces.size()));
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(bucket_id, rbi.getBuckets()[0]);
+ }
+ _sender.clear();
+
+ // Queue must be cleared once pending state is enabled.
+ {
+ lib::ClusterState state("distributor:1 storage:2");
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state, expected_msgs, dummy_buckets_to_return));
+ }
+ ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size());
+ {
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0));
+ EXPECT_EQ(size_t(0), rbi.getBuckets().size());
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 8cb260455ed..1a9cb9f303c 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -985,6 +985,7 @@ void
DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state,
bool has_bucket_ownership_change)
{
+ assert(!_use_legacy_mode);
// TODO STRIPE replace legacy func
enableClusterStateBundle(new_state);
if (has_bucket_ownership_change) {
@@ -995,6 +996,7 @@ DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& ne
const auto now = TimePoint(std::chrono::milliseconds(_component.getClock().getTimeInMillis().getTime()));
_externalOperationHandler.rejectFeedBeforeTimeReached(_ownershipSafeTimeCalc->safeTimePoint(now));
}
+ _bucketDBUpdater.handle_activated_cluster_state_bundle(); // Triggers resending of queued requests
}
void
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
index 06a9672ba50..c48434484d2 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
@@ -170,6 +170,12 @@ StripeBucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
}
+void
+StripeBucketDBUpdater::handle_activated_cluster_state_bundle()
+{
+ sendAllQueuedBucketRechecks();
+}
+
namespace {
class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor {
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 1456308c3d0..9bc91ca78e7 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -46,6 +46,7 @@ public:
void flush();
const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const;
void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
+ void handle_activated_cluster_state_bundle();
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override;