summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/fnet_listener_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/storageserver/fnet_listener_test.cpp')
-rw-r--r--storage/src/tests/storageserver/fnet_listener_test.cpp88
1 files changed, 77 insertions, 11 deletions
diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp
index 84051041d25..d40b230d725 100644
--- a/storage/src/tests/storageserver/fnet_listener_test.cpp
+++ b/storage/src/tests/storageserver/fnet_listener_test.cpp
@@ -27,6 +27,9 @@ public:
CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed);
CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error);
CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error);
+ CPPUNIT_TEST(true_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(false_deferred_activation_flag_can_be_roundtrip_encoded);
+ CPPUNIT_TEST(activate_cluster_state_version_rpc_enqueues_command_with_version);
CPPUNIT_TEST_SUITE_END();
void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle();
@@ -35,6 +38,9 @@ public:
void set_distribution_rpc_is_immediately_failed_if_listener_is_closed();
void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error();
void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error();
+ void true_deferred_activation_flag_can_be_roundtrip_encoded();
+ void false_deferred_activation_flag_can_be_roundtrip_encoded();
+ void activate_cluster_state_version_rpc_enqueues_command_with_version();
};
CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest);
@@ -54,24 +60,25 @@ struct DummyReturnHandler : FRT_IReturnHandler {
FNET_Connection* GetConnection() override { return nullptr; }
};
-struct Fixture {
+struct FixtureBase {
// TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests
mbus::Slobrok slobrok;
vdstestlib::DirConfig config;
MockOperationEnqueuer enqueuer;
std::unique_ptr<FNetListener> fnet_listener;
- SlimeClusterStateBundleCodec codec;
DummyReturnHandler return_handler;
bool request_is_detached{false};
FRT_RPCRequest* bound_request{nullptr};
- Fixture() : config(getStandardConfig(true)) {
+ FixtureBase()
+ : config(getStandardConfig(true))
+ {
config.getConfig("stor-server").set("node_index", "1");
addSlobrokConfig(config, slobrok);
fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0);
}
- ~Fixture() {
+ virtual ~FixtureBase() {
// Must destroy any associated message contexts that may have refs to FRT_Request
// instance _before_ we destroy the request itself.
enqueuer._enqueued.clear();
@@ -79,6 +86,12 @@ struct Fixture {
bound_request->SubRef();
}
}
+};
+
+struct SetStateFixture : FixtureBase {
+ SlimeClusterStateBundleCodec codec;
+
+ SetStateFixture() : FixtureBase() {}
void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) {
bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
@@ -123,6 +136,10 @@ struct Fixture {
lib::ClusterStateBundle dummy_baseline_bundle() const {
return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"));
}
+
+ lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const {
+ return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred);
+ }
};
std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) {
@@ -138,17 +155,17 @@ vespalib::string make_compressable_state_string() {
ss.str().data(), ss.str().data());
}
-}
+} // anon namespace
void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
auto baseline = f.dummy_baseline_bundle();
f.assert_request_received_and_propagated(baseline);
}
void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() {
- Fixture f;
+ SetStateFixture f;
lib::ClusterStateBundle spaces_bundle(
lib::ClusterState("version:123 distributor:3 storage:3"),
{{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")},
@@ -158,7 +175,7 @@ void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command
}
void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
- Fixture f;
+ SetStateFixture f;
auto state_str = make_compressable_state_string();
lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)};
@@ -171,24 +188,73 @@ void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() {
}
void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() {
- Fixture f;
+ SetStateFixture f;
f.create_request(f.dummy_baseline_bundle());
f.fnet_listener->close();
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN);
}
void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() {
- Fixture f;
+ SetStateFixture f;
auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle());
f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100);
f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST);
}
+void FNetListenerTest::true_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true));
+
+}
+
+void FNetListenerTest::false_deferred_activation_flag_can_be_roundtrip_encoded() {
+ SetStateFixture f;
+ f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false));
+}
+
+struct ActivateStateFixture : FixtureBase {
+ ActivateStateFixture() : FixtureBase() {}
+
+ void bind_request_params(uint32_t activate_version) {
+ bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting
+ auto* params = bound_request->GetParams();
+ params->AddInt32(activate_version);
+
+ bound_request->SetDetachedPT(&request_is_detached);
+ bound_request->SetReturnHandler(&return_handler);
+ }
+
+ void create_request(uint32_t activate_version) {
+ // Only 1 request allowed per fixture due to lifetime handling snags
+ assert(bound_request == nullptr);
+ bind_request_params(activate_version);
+ }
+
+ void assert_enqueued_operation_has_activate_version(uint32_t version) {
+ CPPUNIT_ASSERT(bound_request != nullptr);
+ CPPUNIT_ASSERT(request_is_detached);
+ CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size());
+ auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]);
+ CPPUNIT_ASSERT_EQUAL(version, state_request.version());
+ }
+
+ void assert_request_received_and_propagated(uint32_t activate_version) {
+ create_request(activate_version);
+ fnet_listener->RPC_activateClusterStateVersion(bound_request);
+ assert_enqueued_operation_has_activate_version(activate_version);
+ }
+};
+
+void FNetListenerTest::activate_cluster_state_version_rpc_enqueues_command_with_version() {
+ ActivateStateFixture f;
+ f.assert_request_received_and_propagated(1234567);
+}
+
}