diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-16 09:31:34 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-16 09:31:34 +0000 |
commit | 59e57bf76d30591e7a2b730317a3909c05c137f8 (patch) | |
tree | 43df3c8aad8ea41db0fefd841215a3ca652dc7f3 /storage | |
parent | a6b848e49d1aef3758b923f858186ba254ba4ec7 (diff) |
Make RPC compression configurable and use better defaults
Introduces a dedicated RPC compression sub-config. Default
values are the same as for MessageBus compression.
Diffstat (limited to 'storage')
5 files changed, 49 insertions, 9 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 6b1bad11fa7..69259ee08ec 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -122,7 +122,8 @@ public: _shared_rpc_resources = std::make_unique<SharedRpcResources>(_config.getConfigId(), 0, 1); // TODO make codec provider into interface so we can test decode-failures more easily? _codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo, _load_type_set); - _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider); + StorageApiRpcService::Params params; + _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider, params); _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id); // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough. diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 848936afeba..3babc98cbb1 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -62,3 +62,12 @@ use_direct_storageapi_rpc bool default=false ## The number of network (FNET) threads used by the shared rpc resource. rpc.num_network_threads int default=1 + +# Minimum size of packets to compress (0 means no compression) +rpc.compress.limit int default=1024 + +## Compression level for packets +rpc.compress.level int default=3 + +## Compression type for packets. +rpc.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index e4f1c82ce70..5471d66a864 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -246,6 +246,15 @@ struct PlaceHolderBucketResolver : public BucketResolver { } }; +vespalib::compression::CompressionConfig +convert_to_rpc_compression_config(const vespa::config::content::core::StorCommunicationmanagerConfig& mgr_config) { + using vespalib::compression::CompressionConfig; + using vespa::config::content::core::StorCommunicationmanagerConfig; + auto compression_type = CompressionConfig::toType( + StorCommunicationmanagerConfig::Rpc::Compress::getTypeName(mgr_config.rpc.compress.type).c_str()); + return CompressionConfig(compression_type, mgr_config.rpc.compress.level, 90, mgr_config.rpc.compress.limit); +} + } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) @@ -425,8 +434,10 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> _component.getLoadTypes()); _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, config->rpc.numNetworkThreads); _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); + rpc::StorageApiRpcService::Params rpc_params; + rpc_params.compression_config = convert_to_rpc_compression_config(*config); _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>( - *this, *_shared_rpc_resources, *_message_codec_provider); + *this, *_shared_rpc_resources, *_message_codec_provider, rpc_params); if (_mbus) { mbus::DestinationSessionParams dstParams; diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index acc5472a9a9..68339e9c493 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -21,14 +21,18 @@ #include <vespa/log/log.h> LOG_SETUP(".storage.storage_api_rpc_service"); +using vespalib::compression::CompressionConfig; + namespace storage::rpc { StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher, SharedRpcResources& rpc_resources, - MessageCodecProvider& message_codec_provider) + MessageCodecProvider& message_codec_provider, + const Params& params) : _message_dispatcher(message_dispatcher), _rpc_resources(rpc_resources), _message_codec_provider(message_codec_provider), + _params(params), _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory())) { register_server_methods(rpc_resources); @@ -36,6 +40,10 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher StorageApiRpcService::~StorageApiRpcService() = default; +StorageApiRpcService::Params::Params() = default; + +StorageApiRpcService::Params::~Params() = default; + void StorageApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) { FRT_ReflectionBuilder rb(&rpc_resources.supervisor()); rb.DefineMethod("storageapi.v1.send", "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this); @@ -98,13 +106,13 @@ void encode_header_into_rpc_params(HeaderType& hdr, FRT_Values& params) { hdr.SerializeWithCachedSizesToArray(header_buf); } -void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload, FRT_Values& params) { +void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload, + FRT_Values& params, + const CompressionConfig& compression_cfg) { assert(payload.size() <= UINT32_MAX); vespalib::ConstBufferRef to_compress(payload.data(), payload.size()); vespalib::DataBuffer buf(vespalib::roundUp2inN(payload.size())); - // TODO configurable compression config? - vespalib::compression::CompressionConfig comp_cfg(vespalib::compression::CompressionConfig::Type::LZ4); - auto comp_type = compress(comp_cfg, to_compress, buf, false); + auto comp_type = compress(compression_cfg, to_compress, buf, false); assert(buf.getDataLen() <= UINT32_MAX); params.AddInt8(comp_type); @@ -119,7 +127,7 @@ void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& ms auto wrapped_codec = _message_codec_provider.wrapped_codec(); auto payload = wrapped_codec->codec().encode(msg); - compress_and_add_payload_to_rpc_params(payload, params); + compress_and_add_payload_to_rpc_params(payload, params, _params.compression_config); } template <typename PayloadCodecCallback> diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 4385228bb5c..3fca08acc15 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -6,6 +6,7 @@ #include <vespa/fnet/frt/invoker.h> #include <vespa/storageapi/messageapi/returncode.h> #include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/compressionconfig.h> #include <atomic> #include <memory> @@ -33,14 +34,24 @@ class MessageCodecProvider; class SharedRpcResources; class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait { +public: + struct Params { + vespalib::compression::CompressionConfig compression_config; + + Params(); + ~Params(); + }; +private: MessageDispatcher& _message_dispatcher; SharedRpcResources& _rpc_resources; MessageCodecProvider& _message_codec_provider; + const Params _params; std::unique_ptr<CachingRpcTargetResolver> _target_resolver; public: StorageApiRpcService(MessageDispatcher& message_dispatcher, SharedRpcResources& rpc_resources, - MessageCodecProvider& message_codec_provider); + MessageCodecProvider& message_codec_provider, + const Params& params); ~StorageApiRpcService() override; void RPC_rpc_v1_send(FRT_RPCRequest* req); |