summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-18 16:45:37 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-20 16:56:58 +0100
commitc3650cea23c1297e0db87b1bef9005dedda518d2 (patch)
treefc358517e7fc0c18a04d36229ecb924a1b8613c0 /storage
parent2d64b4b12971a307e1fa3af47bdca2bed6371dea (diff)
Test more BucketDBUpdater two-phase transition edge cases
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp149
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp4
3 files changed, 98 insertions, 58 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 0a82fadbf68..7188d060b41 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -27,6 +27,7 @@ using document::test::makeBucketSpace;
using document::BucketSpace;
using document::FixedBucketSpaces;
using document::BucketId;
+using document::Bucket;
namespace storage::distributor {
@@ -118,6 +119,9 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture,
CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled);
CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received);
CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated);
+ CPPUNIT_TEST(read_only_db_is_populated_even_when_self_is_marked_down);
+ CPPUNIT_TEST(activate_cluster_state_request_with_mismatching_version_returns_actual_version);
+ CPPUNIT_TEST(activate_cluster_state_request_without_pending_transition_passes_message_through);
CPPUNIT_TEST_SUITE_END();
public:
@@ -185,6 +189,9 @@ protected:
void non_owned_buckets_purged_when_read_only_support_is_config_disabled();
void deferred_activated_state_does_not_enable_state_until_activation_received();
void read_only_db_cleared_once_pending_state_is_activated();
+ void read_only_db_is_populated_even_when_self_is_marked_down();
+ void activate_cluster_state_request_with_mismatching_version_returns_actual_version();
+ void activate_cluster_state_request_without_pending_transition_passes_message_through();
auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); }
@@ -413,17 +420,25 @@ public:
sortSentMessagesByIndex(_sender, sizeBeforeState);
}
- void setClusterStateBundle(const lib::ClusterStateBundle& state) {
+ void set_cluster_state_bundle(const lib::ClusterStateBundle& state) {
const size_t sizeBeforeState = _sender.commands.size();
getBucketDBUpdater().onSetSystemState(
std::make_shared<api::SetSystemStateCommand>(state));
sortSentMessagesByIndex(_sender, sizeBeforeState);
}
- void activateClusterStateVersion(uint32_t version) {
- getBucketDBUpdater().onActivateClusterStateVersion(
+ bool activate_cluster_state_version(uint32_t version) {
+ return getBucketDBUpdater().onActivateClusterStateVersion(
std::make_shared<api::ActivateClusterStateVersionCommand>(version));
}
+
+ void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) {
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies.back().get());
+ CPPUNIT_ASSERT(response != nullptr);
+ CPPUNIT_ASSERT_EQUAL(version, response->actualVersion());
+ _sender.clear();
+ }
void completeBucketInfoGathering(const lib::ClusterState& state,
size_t expectedMsgs,
@@ -2647,15 +2662,23 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u
namespace {
template <typename Func>
-void for_each_bucket(const BucketDatabase& db, Func f) {
+void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) {
BucketId last(0);
auto e = db.getNext(last);
while (e.valid()) {
- f(e);
+ f(space, e);
e = db.getNext(e.getBucketId());
}
}
+template <typename Func>
+void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
+ for (const auto& space : repo) {
+ // TODO needs to propagate space
+ for_each_bucket(space.second->getBucketDatabase(), space.first, f);
+ }
+}
+
}
using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
@@ -2664,82 +2687,75 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
- setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
constexpr uint32_t n_buckets = 10;
completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
_sender.clear();
- auto& default_db = mutable_repo().get(makeBucketSpace()).getBucketDatabase();
- auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase();
-
- CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), default_db.size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_global_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
lib::ClusterState pending_state("distributor:2 storage:4");
- // TODO iterate over buckets with spaces instead
- std::unordered_set<BucketId, BucketId::hash> buckets_not_owned_in_pending_state;
- for_each_bucket(default_db, [&](const auto& entry) {
+ std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state;
+ for_each_bucket(mutable_repo(), [&](const auto& space, const auto& entry) {
if (!getBucketDBUpdater().getDistributorComponent()
.ownsBucketInState(pending_state, makeDocumentBucket(entry.getBucketId()))) {
- buckets_not_owned_in_pending_state.insert(entry.getBucketId());
+ buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId()));
}
});
CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty());
- setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
- CPPUNIT_ASSERT_EQUAL(n_buckets - buckets_not_owned_in_pending_state.size(), default_db.size());
- CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size());
+ const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces
+ const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space;
+ CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_global_db().size());
+ CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_global_db().size());
- // TODO replace with gmock unordered set equality matcher
- for_each_bucket(read_only_db, [&](const auto& entry) {
- CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(entry.getBucketId())
+ for_each_bucket(read_only_repo(), [&](const auto& space, const auto& entry) {
+ CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId()))
!= buckets_not_owned_in_pending_state.end());
});
-
- // TODO check global space too!
}
-// TODO dedupe setup stuff
void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() {
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
- lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
- setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
-
- CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
constexpr uint32_t n_buckets = 10;
- completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
- _sender.clear();
-
// No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
// cause some buckets to be entirely unavailable.
- lib::ClusterState pending_state("distributor:1 storage:4 .0.s:d .1.s:m");
- setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true));
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0);
CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
}
-// TODO must ensure this works correctly with cluster controller!
void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() {
getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
- setSystemState(initial_state);
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
constexpr uint32_t n_buckets = 10;
completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
_sender.clear();
+ // Nothing in read-only DB after first bulk load of buckets.
CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
lib::ClusterState pending_state("distributor:2 storage:4");
setSystemState(pending_state);
- // No buckets should be moved into read only db
+ // No buckets should be moved into read only db after ownership changes.
CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
}
void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
@@ -2753,13 +2769,12 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state(initial_state_str);
setSystemState(initial_state);
- // TODO decouple expected message count
CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size());
completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets);
_sender.clear();
lib::ClusterState pending_state(pending_state_str); // Ownership change
- setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true));
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true));
CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size());
completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets);
_sender.clear();
@@ -2776,7 +2791,7 @@ void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_a
CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
- activateClusterStateVersion(2);
+ CPPUNIT_ASSERT(!activate_cluster_state_version(2));
CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size());
@@ -2787,26 +2802,50 @@ void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated()
constexpr uint32_t n_buckets = 10;
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
"version:2 distributor:2 storage:4", n_buckets, 0);
- activateClusterStateVersion(2);
+ CPPUNIT_ASSERT(!activate_cluster_state_version(2));
CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size());
CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size());
}
-/*
- * TODO tests
- * - [X] buckets moved to read only db on ownership change
- * - even when self is down in pending state
- * - [X] buckets NOT moved to read only db on content node down/maintenance
- * - [X] read only db cleared when cluster state activated
- * - explicit cluster state activation path
- * - legacy implicit cluster state activation support
- * - return activation ACK if already activated
- * - buckets merged on explicit activation
- * - version check for activation msg
- * - deferred bundle with non-deferred config?
- */
+void BucketDBUpdaterTest::read_only_db_is_populated_even_when_self_is_marked_down() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0);
+
+ // State not yet activated, so read-only DBs have got all the buckets we used to have.
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_global_db().size());
+}
+
+void BucketDBUpdaterTest::activate_cluster_state_request_with_mismatching_version_returns_actual_version() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
+ "version:5 distributor:2 storage:4", n_buckets, 0);
+
+ CPPUNIT_ASSERT(activate_cluster_state_version(4)); // Too old version
+ assert_has_activate_cluster_state_reply_with_actual_version(5);
+
+ CPPUNIT_ASSERT(activate_cluster_state_version(6)); // More recent version than what has been observed
+ assert_has_activate_cluster_state_reply_with_actual_version(5);
+}
+
+void BucketDBUpdaterTest::activate_cluster_state_request_without_pending_transition_passes_message_through() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4);
+ // Activate version 2; no pending cluster state after this.
+ CPPUNIT_ASSERT(!activate_cluster_state_version(2));
+
+ // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager.
+ // Note: state manager is not modelled in this test, so we just check that the message handler returns
+ // false (meaning "didn't take message ownership") and there's no auto-generated reply.
+ CPPUNIT_ASSERT(!activate_cluster_state_version(3));
+ CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size());
+}
-// TODO reconcile default deferred activation state of bundle between Java and C++!
+// TODO rename distributor config to imply two phase functionlity explicitly?
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 202455098db..7516e10082c 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -259,7 +259,6 @@ BucketDBUpdater::onSetSystemState(
bool
BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
{
- // TODO test edges!
if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) {
const auto pending_version = _pendingClusterState->clusterStateVersion();
if (pending_version == cmd->version()) {
@@ -281,7 +280,7 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa
} else {
// Likely just a resend, but log warn for now to get a feel of how common it is.
LOG(warning, "Received cluster state activation command for version %u, which "
- "has no corresponding pending state. Resent operation?", cmd->version());
+ "has no corresponding pending state. Likely resent operation.", cmd->version());
}
// Fall through to next link in call chain that cares about this message.
return false;
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index 4e9fdc99a8a..fb2967965c0 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -208,7 +208,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what());
return;
}
- LOG(info, "Got state bundle %s", state_bundle->toString().c_str()); // TODO
+ LOG(debug, "Got state bundle %s", state_bundle->toString().c_str());
// TODO add constructor taking in shared_ptr directly instead?
auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
@@ -228,6 +228,8 @@ void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) {
auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(activate_version);
cmd->setPriority(api::StorageMessage::VERYHIGH);
+ LOG(debug, "Got state activation request for version %u", activate_version);
+
detach_and_forward_to_enqueuer(std::move(cmd), req);
}