aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-16 09:31:34 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-16 09:31:34 +0000
commit59e57bf76d30591e7a2b730317a3909c05c137f8 (patch)
tree43df3c8aad8ea41db0fefd841215a3ca652dc7f3 /storage
parenta6b848e49d1aef3758b923f858186ba254ba4ec7 (diff)
Make RPC compression configurable and use better defaults
Introduces a dedicated RPC compression sub-config. Default values are the same as for MessageBus compression.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp3
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def9
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp13
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp20
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h13
5 files changed, 49 insertions, 9 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 6b1bad11fa7..69259ee08ec 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -122,7 +122,8 @@ public:
_shared_rpc_resources = std::make_unique<SharedRpcResources>(_config.getConfigId(), 0, 1);
// TODO make codec provider into interface so we can test decode-failures more easily?
_codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo, _load_type_set);
- _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider);
+ StorageApiRpcService::Params params;
+ _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider, params);
_shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id);
// Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough.
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 848936afeba..3babc98cbb1 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -62,3 +62,12 @@ use_direct_storageapi_rpc bool default=false
## The number of network (FNET) threads used by the shared rpc resource.
rpc.num_network_threads int default=1
+
+# Minimum size of packets to compress (0 means no compression)
+rpc.compress.limit int default=1024
+
+## Compression level for packets
+rpc.compress.level int default=3
+
+## Compression type for packets.
+rpc.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index e4f1c82ce70..5471d66a864 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -246,6 +246,15 @@ struct PlaceHolderBucketResolver : public BucketResolver {
}
};
+vespalib::compression::CompressionConfig
+convert_to_rpc_compression_config(const vespa::config::content::core::StorCommunicationmanagerConfig& mgr_config) {
+ using vespalib::compression::CompressionConfig;
+ using vespa::config::content::core::StorCommunicationmanagerConfig;
+ auto compression_type = CompressionConfig::toType(
+ StorCommunicationmanagerConfig::Rpc::Compress::getTypeName(mgr_config.rpc.compress.type).c_str());
+ return CompressionConfig(compression_type, mgr_config.rpc.compress.level, 90, mgr_config.rpc.compress.limit);
+}
+
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
@@ -425,8 +434,10 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
_component.getLoadTypes());
_shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, config->rpc.numNetworkThreads);
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
+ rpc::StorageApiRpcService::Params rpc_params;
+ rpc_params.compression_config = convert_to_rpc_compression_config(*config);
_storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(
- *this, *_shared_rpc_resources, *_message_codec_provider);
+ *this, *_shared_rpc_resources, *_message_codec_provider, rpc_params);
if (_mbus) {
mbus::DestinationSessionParams dstParams;
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index acc5472a9a9..68339e9c493 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -21,14 +21,18 @@
#include <vespa/log/log.h>
LOG_SETUP(".storage.storage_api_rpc_service");
+using vespalib::compression::CompressionConfig;
+
namespace storage::rpc {
StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher,
SharedRpcResources& rpc_resources,
- MessageCodecProvider& message_codec_provider)
+ MessageCodecProvider& message_codec_provider,
+ const Params& params)
: _message_dispatcher(message_dispatcher),
_rpc_resources(rpc_resources),
_message_codec_provider(message_codec_provider),
+ _params(params),
_target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory()))
{
register_server_methods(rpc_resources);
@@ -36,6 +40,10 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
StorageApiRpcService::~StorageApiRpcService() = default;
+StorageApiRpcService::Params::Params() = default;
+
+StorageApiRpcService::Params::~Params() = default;
+
void StorageApiRpcService::register_server_methods(SharedRpcResources& rpc_resources) {
FRT_ReflectionBuilder rb(&rpc_resources.supervisor());
rb.DefineMethod("storageapi.v1.send", "bixbix", "bixbix", FRT_METHOD(StorageApiRpcService::RPC_rpc_v1_send), this);
@@ -98,13 +106,13 @@ void encode_header_into_rpc_params(HeaderType& hdr, FRT_Values& params) {
hdr.SerializeWithCachedSizesToArray(header_buf);
}
-void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload, FRT_Values& params) {
+void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload,
+ FRT_Values& params,
+ const CompressionConfig& compression_cfg) {
assert(payload.size() <= UINT32_MAX);
vespalib::ConstBufferRef to_compress(payload.data(), payload.size());
vespalib::DataBuffer buf(vespalib::roundUp2inN(payload.size()));
- // TODO configurable compression config?
- vespalib::compression::CompressionConfig comp_cfg(vespalib::compression::CompressionConfig::Type::LZ4);
- auto comp_type = compress(comp_cfg, to_compress, buf, false);
+ auto comp_type = compress(compression_cfg, to_compress, buf, false);
assert(buf.getDataLen() <= UINT32_MAX);
params.AddInt8(comp_type);
@@ -119,7 +127,7 @@ void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& ms
auto wrapped_codec = _message_codec_provider.wrapped_codec();
auto payload = wrapped_codec->codec().encode(msg);
- compress_and_add_payload_to_rpc_params(payload, params);
+ compress_and_add_payload_to_rpc_params(payload, params, _params.compression_config);
}
template <typename PayloadCodecCallback>
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 4385228bb5c..3fca08acc15 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -6,6 +6,7 @@
#include <vespa/fnet/frt/invoker.h>
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/vespalib/stllike/string.h>
+#include <vespa/vespalib/util/compressionconfig.h>
#include <atomic>
#include <memory>
@@ -33,14 +34,24 @@ class MessageCodecProvider;
class SharedRpcResources;
class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait {
+public:
+ struct Params {
+ vespalib::compression::CompressionConfig compression_config;
+
+ Params();
+ ~Params();
+ };
+private:
MessageDispatcher& _message_dispatcher;
SharedRpcResources& _rpc_resources;
MessageCodecProvider& _message_codec_provider;
+ const Params _params;
std::unique_ptr<CachingRpcTargetResolver> _target_resolver;
public:
StorageApiRpcService(MessageDispatcher& message_dispatcher,
SharedRpcResources& rpc_resources,
- MessageCodecProvider& message_codec_provider);
+ MessageCodecProvider& message_codec_provider,
+ const Params& params);
~StorageApiRpcService() override;
void RPC_rpc_v1_send(FRT_RPCRequest* req);