diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-08 13:42:12 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-14 13:22:14 +0000 |
commit | 54150406f3c490463bc5371c9480452da168bd5c (patch) | |
tree | 73721c84df79dd649c967a5fe9f0525431476af6 /storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp | |
parent | ea22ec7cb6e35cf591d08ffe898388a7f08593cc (diff) |
Basic functionality for direct RPC for StorageAPI communication
This has several advantages:
* Completely bypasses all MessageBus indirections
* Explicit setup of RPC thread pool
* Direct dispatch from RPC thread to persistence queue pool
* Better control of encoding/decoding and buffer usage
Diffstat (limited to 'storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp')
-rw-r--r-- | storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp b/storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp new file mode 100644 index 00000000000..7510e0579ea --- /dev/null +++ b/storage/src/tests/storageserver/cluster_controller_rpc_api_service_test.cpp @@ -0,0 +1,242 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/storage/storageserver/message_enqueuer.h> +#include <vespa/storage/storageserver/rpcrequestwrapper.h> +#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h> +#include <vespa/storageapi/message/state.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vdstestlib/config/dirconfig.hpp> +#include <tests/common/testhelper.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vector> + +namespace storage::rpc { + +using document::FixedBucketSpaces; +using namespace ::testing; + +struct ClusterControllerApiRpcServiceTest : Test { +}; + +namespace { + +struct MockOperationEnqueuer : MessageEnqueuer { + std::vector<std::shared_ptr<api::StorageMessage>> _enqueued; + + void enqueue(std::shared_ptr<api::StorageMessage> msg) override { + _enqueued.emplace_back(std::move(msg)); + } +}; + +struct DummyReturnHandler : FRT_IReturnHandler { + void HandleReturn() override {} + FNET_Connection* GetConnection() override { return nullptr; } +}; + +struct FixtureBase { + mbus::Slobrok slobrok; + vdstestlib::DirConfig config; + MockOperationEnqueuer enqueuer; + std::unique_ptr<SharedRpcResources> shared_rpc_resources; + std::unique_ptr<ClusterControllerApiRpcService> cc_service; + DummyReturnHandler return_handler; + bool request_is_detached{false}; + FRT_RPCRequest* bound_request{nullptr}; + + FixtureBase() + : config(getStandardConfig(true)) + { + config.getConfig("stor-server").set("node_index", "1"); + addSlobrokConfig(config, slobrok); + + shared_rpc_resources = std::make_unique<SharedRpcResources>(config.getConfigId(), 0, 1); + cc_service = std::make_unique<ClusterControllerApiRpcService>(enqueuer, *shared_rpc_resources); + shared_rpc_resources->start_server_and_register_slobrok("my_cool_rpc_test"); + } + + virtual ~FixtureBase() { + // Must destroy any associated message contexts that may have refs to FRT_Request + // instance _before_ we destroy the request itself. + enqueuer._enqueued.clear(); + if (bound_request) { + bound_request->SubRef(); + } + } +}; + +struct SetStateFixture : FixtureBase { + SlimeClusterStateBundleCodec codec; + + SetStateFixture() : FixtureBase() {} + + void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) { + bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting + auto* params = bound_request->GetParams(); + params->AddInt8(static_cast<uint8_t>(encoded_bundle._compression_type)); + params->AddInt32(uncompressed_length); + const auto buf_len = encoded_bundle._buffer->getDataLen(); + params->AddData(encoded_bundle._buffer->stealBuffer(), buf_len); + + bound_request->SetDetachedPT(&request_is_detached); + bound_request->SetReturnHandler(&return_handler); + } + + void create_request(const lib::ClusterStateBundle& bundle) { + // Only 1 request allowed per fixture due to lifetime handling snags + assert(bound_request == nullptr); + auto encoded_bundle = codec.encode(bundle); + bind_request_params(encoded_bundle, encoded_bundle._uncompressed_length); + } + + void assert_enqueued_operation_has_bundle(const lib::ClusterStateBundle& expectedBundle) { + ASSERT_TRUE(bound_request != nullptr); + ASSERT_TRUE(request_is_detached); + ASSERT_EQ(1, enqueuer._enqueued.size()); + auto& state_request = dynamic_cast<const api::SetSystemStateCommand&>(*enqueuer._enqueued[0]); + ASSERT_EQ(expectedBundle, state_request.getClusterStateBundle()); + } + + void assert_request_received_and_propagated(const lib::ClusterStateBundle& bundle) { + create_request(bundle); + cc_service->RPC_setDistributionStates(bound_request); + assert_enqueued_operation_has_bundle(bundle); + } + + void assert_request_returns_error_response(RPCRequestWrapper::ErrorCode error_code) { + cc_service->RPC_setDistributionStates(bound_request); + ASSERT_FALSE(request_is_detached); + ASSERT_TRUE(bound_request->IsError()); + ASSERT_EQ(static_cast<uint32_t>(error_code), bound_request->GetErrorCode()); + } + + static lib::ClusterStateBundle dummy_baseline_bundle() { + return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3")); + } + + static lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) { + return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred); + } +}; + +std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) { + return std::make_shared<const lib::ClusterState>(state); +} + +vespalib::string make_compressable_state_string() { + vespalib::asciistream ss; + for (int i = 0; i < 99; ++i) { + ss << " ." << i << ".s:d"; + } + return vespalib::make_string("version:123 distributor:100%s storage:100%s", + ss.str().data(), ss.str().data()); +} + +} // anon namespace + +TEST_F(ClusterControllerApiRpcServiceTest, baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle) { + SetStateFixture f; + auto baseline = f.dummy_baseline_bundle(); + + f.assert_request_received_and_propagated(baseline); +} + +TEST_F(ClusterControllerApiRpcServiceTest, set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle) { + SetStateFixture f; + lib::ClusterStateBundle spaces_bundle( + lib::ClusterState("version:123 distributor:3 storage:3"), + {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")}, + {FixedBucketSpaces::global_space(), state_of("version:123 distributor:3 .1.s:d storage:3")}}); + + f.assert_request_received_and_propagated(spaces_bundle); +} + +TEST_F(ClusterControllerApiRpcServiceTest, compressed_bundle_is_transparently_uncompressed) { + SetStateFixture f; + auto state_str = make_compressable_state_string(); + lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)}; + + f.create_request(compressable_bundle); + // First verify that the bundle is sent in compressed form + ASSERT_LT(f.bound_request->GetParams()->GetValue(2)._data._len, state_str.size()); + // Ensure we uncompress it to the original form + f.cc_service->RPC_setDistributionStates(f.bound_request); + f.assert_enqueued_operation_has_bundle(compressable_bundle); +} + +TEST_F(ClusterControllerApiRpcServiceTest, set_distribution_rpc_is_immediately_failed_if_listener_is_closed) { + SetStateFixture f; + f.create_request(f.dummy_baseline_bundle()); + f.cc_service->close(); + f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN); +} + +TEST_F(ClusterControllerApiRpcServiceTest, overly_large_uncompressed_bundle_size_parameter_returns_rpc_error) { + SetStateFixture f; + auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); + f.bind_request_params(encoded_bundle, ClusterControllerApiRpcService::StateBundleMaxUncompressedSize + 1); + f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); +} + +TEST_F(ClusterControllerApiRpcServiceTest, mismatching_uncompressed_bundle_size_parameter_returns_rpc_error) { + SetStateFixture f; + auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); + f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100); + f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); +} + +TEST_F(ClusterControllerApiRpcServiceTest, true_deferred_activation_flag_can_be_roundtrip_encoded) { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true)); + +} + +TEST_F(ClusterControllerApiRpcServiceTest, false_deferred_activation_flag_can_be_roundtrip_encoded) { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false)); +} + +struct ActivateStateFixture : FixtureBase { + ActivateStateFixture() : FixtureBase() {} + + void bind_request_params(uint32_t activate_version) { + bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting + auto* params = bound_request->GetParams(); + params->AddInt32(activate_version); + + bound_request->SetDetachedPT(&request_is_detached); + bound_request->SetReturnHandler(&return_handler); + } + + void create_request(uint32_t activate_version) { + // Only 1 request allowed per fixture due to lifetime handling snags + assert(bound_request == nullptr); + bind_request_params(activate_version); + } + + void assert_enqueued_operation_has_activate_version(uint32_t version) { + ASSERT_TRUE(bound_request != nullptr); + ASSERT_TRUE(request_is_detached); + ASSERT_EQ(1, enqueuer._enqueued.size()); + auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]); + ASSERT_EQ(version, state_request.version()); + } + + void assert_request_received_and_propagated(uint32_t activate_version) { + create_request(activate_version); + cc_service->RPC_activateClusterStateVersion(bound_request); + assert_enqueued_operation_has_activate_version(activate_version); + } +}; + +TEST_F(ClusterControllerApiRpcServiceTest, activate_cluster_state_version_rpc_enqueues_command_with_version) { + ActivateStateFixture f; + f.assert_request_received_and_propagated(1234567); +} + +} |