aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-15 16:05:22 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-15 16:05:22 +0000
commit00804dfe908b6a337f88c42ea3def0e1f1397474 (patch)
treee4e58c7a54e830fc5a237fe0be945e1f62f40492
parent1b959abd3224c00f0347a8078dc333abfdd3ce9f (diff)
Properly handle non-owned vs. missing buckets
Bonus: no more spurious "we have removed buckets" log messages caused by ownership changes. Also ensure that we BUSY-bounce operations in `ExternalOperationHandler` when there is no actual state to send back in a `WrongDistributionReply`.
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp146
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp54
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp30
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp23
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h8
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp28
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.h1
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp1
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp11
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp9
-rw-r--r--vdslib/src/vespa/vdslib/state/cluster_state_bundle.h2
11 files changed, 261 insertions, 52 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index fd9512b2bd6..0a82fadbf68 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -114,19 +114,12 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture,
CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted);
CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection);
CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change);
+ CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database);
CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled);
+ CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received);
+ CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated);
CPPUNIT_TEST_SUITE_END();
- /*
- * TODO tests
- * - buckets moved to read only db on ownership change
- * - even when self is down in pending state
- * - buckets NOT moved to read only db on content node down/maintenance
- * - read only db cleared when cluster state activated
- * - explicit cluster state activation path
- * - legacy implicit cluster state activation support
- */
-
public:
BucketDBUpdaterTest();
@@ -188,7 +181,10 @@ protected:
void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted();
void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection();
void non_owned_buckets_moved_to_read_only_db_on_ownership_change();
+ void buckets_no_longer_available_are_not_moved_to_read_only_database();
void non_owned_buckets_purged_when_read_only_support_is_config_disabled();
+ void deferred_activated_state_does_not_enable_state_until_activation_received();
+ void read_only_db_cleared_once_pending_state_is_activated();
auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); }
@@ -215,6 +211,19 @@ protected:
// cluster state component (just not by operations), so it would not have the expected semantics.
DistributorBucketSpaceRepo& read_only_repo() noexcept { return getReadOnlyBucketSpaceRepo(); }
+ BucketDatabase& mutable_default_db() noexcept {
+ return mutable_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ BucketDatabase& mutable_global_db() noexcept {
+ return mutable_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+ BucketDatabase& read_only_default_db() noexcept {
+ return read_only_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ BucketDatabase& read_only_global_db() noexcept {
+ return read_only_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+
static std::string getNodeList(std::vector<uint16_t> nodes, size_t count);
std::string getNodeList(std::vector<uint16_t> nodes);
@@ -228,6 +237,10 @@ protected:
return messagesPerBucketSpace * _bucketSpaces.size();
}
+ void trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state, uint32_t initial_buckets, uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state, uint32_t pending_buckets, uint32_t pending_expected_msgs);
+
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
void setUp() override {
@@ -391,8 +404,7 @@ public:
void setSystemState(const lib::ClusterState& state) {
const size_t sizeBeforeState = _sender.commands.size();
getBucketDBUpdater().onSetSystemState(
- std::shared_ptr<api::SetSystemStateCommand>(
- new api::SetSystemStateCommand(state)));
+ std::make_shared<api::SetSystemStateCommand>(state));
// A lot of test logic has the assumption that all messages sent as a
// result of cluster state changes will be in increasing index order
// (for simplicity, not because this is required for correctness).
@@ -401,6 +413,18 @@ public:
sortSentMessagesByIndex(_sender, sizeBeforeState);
}
+ void setClusterStateBundle(const lib::ClusterStateBundle& state) {
+ const size_t sizeBeforeState = _sender.commands.size();
+ getBucketDBUpdater().onSetSystemState(
+ std::make_shared<api::SetSystemStateCommand>(state));
+ sortSentMessagesByIndex(_sender, sizeBeforeState);
+ }
+
+ void activateClusterStateVersion(uint32_t version) {
+ getBucketDBUpdater().onActivateClusterStateVersion(
+ std::make_shared<api::ActivateClusterStateVersionCommand>(version));
+ }
+
void completeBucketInfoGathering(const lib::ClusterState& state,
size_t expectedMsgs,
uint32_t bucketCount = 1,
@@ -2640,7 +2664,7 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
- setSystemState(initial_state);
+ setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
constexpr uint32_t n_buckets = 10;
@@ -2665,9 +2689,9 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
});
CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty());
- setSystemState(pending_state);
+ setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
- CPPUNIT_ASSERT_EQUAL(size_t(n_buckets - buckets_not_owned_in_pending_state.size()), default_db.size());
+ CPPUNIT_ASSERT_EQUAL(n_buckets - buckets_not_owned_in_pending_state.size(), default_db.size());
CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size());
// TODO replace with gmock unordered set equality matcher
@@ -2679,6 +2703,26 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
// TODO check global space too!
}
+// TODO dedupe setup stuff
+void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() {
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ constexpr uint32_t n_buckets = 10;
+ completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ _sender.clear();
+
+ // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
+ // cause some buckets to be entirely unavailable.
+ lib::ClusterState pending_state("distributor:1 storage:4 .0.s:d .1.s:m");
+ setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true));
+
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+}
+
+// TODO must ensure this works correctly with cluster controller!
void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() {
getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
@@ -2690,13 +2734,79 @@ void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_con
completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
_sender.clear();
- auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase();
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
lib::ClusterState pending_state("distributor:2 storage:4");
setSystemState(pending_state);
// No buckets should be moved into read only db
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+}
+
+void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state_str,
+ uint32_t initial_buckets,
+ uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state_str,
+ uint32_t pending_buckets,
+ uint32_t pending_expected_msgs)
+{
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ lib::ClusterState initial_state(initial_state_str);
+ setSystemState(initial_state);
+ // TODO decouple expected message count
+ CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size());
+ completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets);
+ _sender.clear();
+
+ lib::ClusterState pending_state(pending_state_str); // Ownership change
+ setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true));
+ CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size());
+ completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets);
+ _sender.clear();
+}
+
+void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4);
+
+ // Version should not be switched over yet
+ CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
+
+ activateClusterStateVersion(2);
+
+ CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size());
}
+void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 storage:4", n_buckets, 0);
+ activateClusterStateVersion(2);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size());
+}
+
+/*
+ * TODO tests
+ * - [X] buckets moved to read only db on ownership change
+ * - even when self is down in pending state
+ * - [X] buckets NOT moved to read only db on content node down/maintenance
+ * - [X] read only db cleared when cluster state activated
+ * - explicit cluster state activation path
+ * - legacy implicit cluster state activation support
+ * - return activation ACK if already activated
+ * - buckets merged on explicit activation
+ * - version check for activation msg
+ * - deferred bundle with non-deferred config?
+ */
+
+// TODO reconcile default deferred activation state of bundle between Java and C++!
+
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index ebf9d4e3bc8..76917f856f0 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -23,7 +23,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST(testBucketSplitMask);
CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution);
CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution);
+ CPPUNIT_TEST(mutating_operation_busy_bounced_if_no_cluster_state_received_yet);
CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution);
+ CPPUNIT_TEST(read_only_operation_busy_bounced_if_no_cluster_state_received_yet);
CPPUNIT_TEST(reject_put_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_update_if_not_past_safe_time_point);
@@ -59,6 +61,8 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
const vespalib::string& id) const;
std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const;
+ void verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd);
+
Operation::SP start_operation_verify_not_rejected(std::shared_ptr<api::StorageCommand> cmd);
void start_operation_verify_rejected(std::shared_ptr<api::StorageCommand> cmd);
@@ -92,8 +96,10 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
protected:
void testBucketSplitMask();
void mutating_operation_wdr_bounced_on_wrong_current_distribution();
- void read_only_operation_wdr_bounced_on_wrong_current_distribution();
void mutating_operation_busy_bounced_on_wrong_pending_distribution();
+ void mutating_operation_busy_bounced_if_no_cluster_state_received_yet();
+ void read_only_operation_wdr_bounced_on_wrong_current_distribution();
+ void read_only_operation_busy_bounced_if_no_cluster_state_received_yet();
void reject_put_if_not_past_safe_time_point();
void reject_remove_if_not_past_safe_time_point();
void reject_update_if_not_past_safe_time_point();
@@ -254,7 +260,7 @@ void
ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution()
{
createLinks();
- std::string state("distributor:2 storage:2");
+ std::string state("version:1 distributor:2 storage:2");
setupDistributor(1, 2, state);
document::BucketId bucket(findNonOwnedUserBucketInState(state));
@@ -266,7 +272,7 @@ ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_di
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
CPPUNIT_ASSERT_EQUAL(
std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:2 storage:2)"),
+ "version:1 distributor:2 storage:2)"),
_sender.replies[0]->getResult().toString());
}
@@ -274,7 +280,7 @@ void
ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution()
{
createLinks();
- std::string state("distributor:2 storage:2");
+ std::string state("version:1 distributor:2 storage:2");
setupDistributor(1, 2, state);
document::BucketId bucket(findNonOwnedUserBucketInState(state));
@@ -286,7 +292,7 @@ ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_d
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
CPPUNIT_ASSERT_EQUAL(
std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:2 storage:2)"),
+ "version:1 distributor:2 storage:2)"),
_sender.replies[0]->getResult().toString());
}
@@ -315,6 +321,34 @@ ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_d
_sender.replies[0]->getResult().toString());
}
+void
+ExternalOperationHandlerTest::verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd)
+{
+ createLinks();
+ std::string state{}; // No version --> not yet received
+ setupDistributor(1, 2, state);
+
+ Operation::SP genOp;
+ CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
+ CPPUNIT_ASSERT(!genOp.get());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(BUSY, No cluster state activated yet)"),
+ _sender.replies[0]->getResult().toString());
+}
+
+void
+ExternalOperationHandlerTest::mutating_operation_busy_bounced_if_no_cluster_state_received_yet()
+{
+ verify_busy_bounced_due_to_no_active_state(makeUpdateCommandForUser(12345));
+}
+
+void
+ExternalOperationHandlerTest::read_only_operation_busy_bounced_if_no_cluster_state_received_yet()
+{
+ verify_busy_bounced_due_to_no_active_state(makeGetCommandForUser(12345));
+}
+
using TimePoint = ExternalOperationHandler::TimePoint;
using namespace std::literals::chrono_literals;
@@ -322,7 +356,7 @@ void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time(
std::shared_ptr<api::StorageCommand> cmd)
{
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(9);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -357,7 +391,7 @@ void ExternalOperationHandlerTest::reject_update_if_not_past_safe_time_point() {
void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(9);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -372,7 +406,7 @@ void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() {
void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(10);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -390,7 +424,7 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached
void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
}
Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected(
@@ -518,7 +552,7 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled(
void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() {
createLinks();
- std::string current = "distributor:1 storage:3";
+ std::string current = "version:1 distributor:1 storage:3";
setupDistributor(1, 3, current);
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index 19f414482db..cdf990fa28f 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -37,6 +37,7 @@ struct StateManagerTest : public CppUnit::TestFixture {
void can_explicitly_send_get_node_state_reply();
void explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request();
void immediate_node_state_replying_is_tracked_per_controller();
+ void activation_command_is_bounced_with_current_cluster_state_version();
CPPUNIT_TEST_SUITE(StateManagerTest);
CPPUNIT_TEST(testSystemState);
@@ -45,8 +46,10 @@ struct StateManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(can_explicitly_send_get_node_state_reply);
CPPUNIT_TEST(explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request);
CPPUNIT_TEST(immediate_node_state_replying_is_tracked_per_controller);
+ CPPUNIT_TEST(activation_command_is_bounced_with_current_cluster_state_version);
CPPUNIT_TEST_SUITE_END();
+ void force_current_cluster_state_version(uint32_t version);
void mark_reported_node_state_up();
void send_down_get_node_state_request(uint16_t controller_index);
void assert_ok_get_node_state_reply_sent_and_clear();
@@ -101,6 +104,12 @@ StateManagerTest::tearDown() {
_metricManager.reset();
}
+void StateManagerTest::force_current_cluster_state_version(uint32_t version) {
+ ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
+ state.setVersion(version);
+ _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+}
+
#define GET_ONLY_OK_REPLY(varname) \
{ \
CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \
@@ -236,9 +245,7 @@ StateManagerTest::testReportedNodeState()
}
void StateManagerTest::current_cluster_state_version_is_included_in_host_info_json() {
- ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
- state.setVersion(123);
- _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+ force_current_cluster_state_version(123);
std::string nodeInfoString(_manager->getNodeInfo());
vespalib::Memory goldenMemory(nodeInfoString);
@@ -343,4 +350,21 @@ void StateManagerTest::immediate_node_state_replying_is_tracked_per_controller()
CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies());
}
+void StateManagerTest::activation_command_is_bounced_with_current_cluster_state_version() {
+ force_current_cluster_state_version(12345);
+
+ auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
+ cmd->setTimeout(10000000);
+ cmd->setSourceIndex(0);
+ _upper->sendDown(cmd);
+
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies());
+ std::shared_ptr<api::StorageReply> reply;
+ GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType());
+ auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply);
+ CPPUNIT_ASSERT_EQUAL(uint32_t(12340), activate_reply.activateVersion());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(12345), activate_reply.actualVersion());
+}
+
} // storage
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 20ec67fdb14..202455098db 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -121,7 +121,6 @@ void
BucketDBUpdater::removeSuperfluousBuckets(
const lib::ClusterStateBundle& newState)
{
- // TODO too many indirections to get at config.
const bool move_to_read_only_db = _distributorComponent.getDistributor().getConfig()
.allowStaleReadsDuringClusterStateTransitions();
for (auto &elem : _distributorComponent.getBucketSpaceRepo()) {
@@ -140,15 +139,18 @@ BucketDBUpdater::removeSuperfluousBuckets(
_distributorComponent.getDistributor().getStorageNodeUpStates());
bucketDb.forEach(proc);
- // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory...
for (const auto & bucket : proc.getBucketsToRemove()) {
+ bucketDb.remove(bucket);
+ }
+ // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory...
+ for (const auto& bucket : proc.getNonOwnedBuckets()) {
if (move_to_read_only_db) {
- // TODO explicit transfer function?
auto db_entry = bucketDb.get(bucket);
readOnlyDb.update(db_entry); // TODO Entry move support
}
bucketDb.remove(bucket);
}
+
}
}
@@ -546,11 +548,17 @@ void
BucketDBUpdater::processCompletedPendingClusterState()
{
if (_pendingClusterState->isDeferred()) {
+ LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated",
+ _pendingClusterState->clusterStateVersion());
assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands.
// Sending down SetSystemState command will reach the state manager and a reply
// will be auto-sent back to the cluster controller in charge. Once this happens,
// it will send an explicit activation command once all distributors have reported
// that their pending cluster states have completed.
+ // A booting distributor will treat itself as "system Up" before the state has actually
+ // taken effect via activation. External operation handler will keep operations from
+ // actually being scheduled until state has been activated. The external operation handler
+ // needs to be explicitly aware of the case where no state has yet to be activated.
_distributorComponent.getDistributor().getMessageSender().sendDown(
_pendingClusterState->getCommand());
_pendingClusterState->clearCommand();
@@ -567,6 +575,7 @@ BucketDBUpdater::activatePendingClusterState()
_pendingClusterState->mergeIntoBucketDatabases();
if (_pendingClusterState->isVersionedTransition()) {
+ LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
enableCurrentClusterStateBundleInDistributor();
if (_pendingClusterState->hasCommand()) {
_distributorComponent.getDistributor().getMessageSender().sendDown(
@@ -574,6 +583,7 @@ BucketDBUpdater::activatePendingClusterState()
}
addCurrentStateToClusterStateHistory();
} else {
+ LOG(debug, "Activating pending distribution config");
// TODO distribution changes cannot currently be deferred as they are not
// initiated by the cluster controller!
_distributorComponent.getDistributor().notifyDistributionChangeEnabled();
@@ -583,13 +593,16 @@ BucketDBUpdater::activatePendingClusterState()
_outdatedNodesMap.clear();
sendAllQueuedBucketRechecks();
completeTransitionTimer();
+ for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) {
+ space.second->getBucketDatabase().clear();
+ }
}
void
BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
{
const lib::ClusterStateBundle& state(
- _pendingClusterState->getCommand()->getClusterStateBundle());
+ _pendingClusterState->getNewClusterStateBundle());
LOG(debug,
"BucketDBUpdater finished processing state %s",
@@ -771,7 +784,7 @@ BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e)
return true;
}
if (!distributorOwnsBucket(bucketId)) {
- _removedBuckets.push_back(bucketId);
+ _nonOwnedBuckets.push_back(bucketId);
return true;
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 973495d2007..ee98e398f6d 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -203,6 +203,8 @@ private:
const char* upStates)
: _oldState(oldState),
_state(s),
+ _nonOwnedBuckets(),
+ _removedBuckets(),
_localIndex(localIndex),
_distribution(distribution),
_upStates(upStates) {}
@@ -212,15 +214,19 @@ private:
void logRemove(const document::BucketId& bucketId, const char* msg) const;
bool distributorOwnsBucket(const document::BucketId&) const;
- const std::vector<document::BucketId>& getBucketsToRemove() const {
+ const std::vector<document::BucketId>& getBucketsToRemove() const noexcept {
return _removedBuckets;
}
+ const std::vector<document::BucketId>& getNonOwnedBuckets() const noexcept {
+ return _nonOwnedBuckets;
+ }
private:
void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
void removeEmptyBucket(const document::BucketId& bucketId);
const lib::ClusterState _oldState;
const lib::ClusterState _state;
+ std::vector<document::BucketId> _nonOwnedBuckets;
std::vector<document::BucketId> _removedBuckets;
uint16_t _localIndex;
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 5d3261a71f7..7d0681df88d 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -72,19 +72,27 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd)
return true;
}
+void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result) {
+ api::StorageReply::UP reply(cmd.makeReply());
+ reply->setResult(result);
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) {
// Distributor ownership is equal across cluster states, so always send back default state.
// This also helps client avoid getting confused by possibly observing different actual
- // states for global/non-global document types for the same state version.
- auto cluster_state_str = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space())
- .getClusterState().toString();
- LOG(debug, "Got message with wrong distribution, sending back state '%s'",
- cluster_state_str.c_str());
-
- api::StorageReply::UP reply(cmd.makeReply());
- api::ReturnCode ret(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str);
- reply->setResult(ret);
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+ // (derived) state strings for global/non-global document types for the same state version.
+ // Similarly, if we've yet to activate any version at all we send back BUSY instead
+ // of a suspiciously empty WrongDistributionReply.
+ const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState();
+ if (cluster_state.getVersion() != 0) {
+ auto cluster_state_str = cluster_state.toString();
+ LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str());
+ bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str));
+ } else { // Only valid for empty startup state
+ LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY");
+ bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet"));
+ }
}
void ExternalOperationHandler::bounce_with_busy_during_state_transition(
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h
index bd51a914ea8..655feb5d00c 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.h
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h
@@ -66,6 +66,7 @@ private:
void bounce_with_busy_during_state_transition(api::StorageCommand& cmd,
const lib::ClusterState& current_state,
const lib::ClusterState& pending_state);
+ void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result);
bool checkSafeTimeReached(api::StorageCommand& cmd);
api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime);
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index 3654b17a2a7..4e9fdc99a8a 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -208,6 +208,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) {
req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what());
return;
}
+ LOG(info, "Got state bundle %s", state_bundle->toString().c_str()); // TODO
// TODO add constructor taking in shared_ptr directly instead?
auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle);
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index 02b9a765dd8..af01a880fea 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -518,11 +518,12 @@ bool
StateManager::onActivateClusterStateVersion(
const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd)
{
- // TODO we probably don't want to invoke listeners here? but just bounce with
- // currently activated bundle version?
- // Must ensure that layer above (i.e. distributor) maintains strict operation
- // ordering.
- sendUp(std::make_shared<api::ActivateClusterStateVersionReply>(*cmd));
+ auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(*cmd);
+ {
+ vespalib::LockGuard lock(_stateLock);
+ reply->setActualVersion(_systemState ? _systemState->getVersion() : 0);
+ }
+ sendUp(reply);
return true;
}
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
index 68a279f04f8..ff633c02fad 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.cpp
@@ -4,6 +4,7 @@
#include "cluster_state_bundle.h"
#include "clusterstate.h"
#include <iostream>
+#include <sstream>
namespace storage::lib {
@@ -78,6 +79,14 @@ ClusterStateBundle::operator==(const ClusterStateBundle &rhs) const
return true;
}
+std::string
+ClusterStateBundle::toString() const
+{
+ std::ostringstream os;
+ os << *this;
+ return os.str();
+}
+
std::ostream& operator<<(std::ostream& os, const ClusterStateBundle& bundle) {
os << "ClusterStateBundle('" << *bundle.getBaselineClusterState();
if (!bundle.getDerivedClusterStates().empty()) {
diff --git a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
index a9e84225c1f..d0b052766ff 100644
--- a/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
+++ b/vdslib/src/vespa/vdslib/state/cluster_state_bundle.h
@@ -5,6 +5,7 @@
#include <vespa/document/bucket/bucketspace.h>
#include <unordered_map>
#include <iosfwd>
+#include <string>
namespace storage::lib {
@@ -40,6 +41,7 @@ public:
}
uint32_t getVersion() const;
bool deferredActivation() const noexcept { return _deferredActivation; }
+ std::string toString() const;
bool operator==(const ClusterStateBundle &rhs) const;
bool operator!=(const ClusterStateBundle &rhs) const { return !operator==(rhs); }
};