aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp16
-rw-r--r--storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp2
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp2
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def3
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h3
7 files changed, 27 insertions, 8 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 5a5f5cb46d6..170885d1d99 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -269,6 +269,7 @@ class BMParams {
uint32_t _update_passes;
uint32_t _remove_passes;
uint32_t _rpc_network_threads;
+ uint32_t _rpc_events_before_wakeup;
uint32_t _rpc_targets_per_node;
uint32_t _response_threads;
uint32_t _max_pending;
@@ -292,7 +293,8 @@ public:
_put_passes(2),
_update_passes(1),
_remove_passes(2),
- _rpc_network_threads(1), // Same default as in stor-communicationmanager.def
+ _rpc_network_threads(1), // Same default as previous in stor-communicationmanager.def
+ _rpc_events_before_wakeup(1), // Same default as in stor-communicationmanager.def
_rpc_targets_per_node(1), // Same default as in stor-communicationmanager.def
_response_threads(2), // Same default as in stor-filestor.def
_max_pending(1000),
@@ -318,6 +320,7 @@ public:
uint32_t get_update_passes() const { return _update_passes; }
uint32_t get_remove_passes() const { return _remove_passes; }
uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
+ uint32_t get_rpc_events_before_wakup() const { return _rpc_events_before_wakeup; }
uint32_t get_rpc_targets_per_node() const { return _rpc_targets_per_node; }
uint32_t get_response_threads() const { return _response_threads; }
bool get_enable_distributor() const { return _enable_distributor; }
@@ -336,6 +339,7 @@ public:
void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; }
void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; }
void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; }
+ void set_rpc_events_before_wakeup(uint32_t value) { _rpc_events_before_wakeup = value; }
void set_rpc_targets_per_node(uint32_t targets_in) { _rpc_targets_per_node = targets_in; }
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
void set_enable_distributor(bool value) { _enable_distributor = value; }
@@ -491,6 +495,7 @@ struct MyStorageConfig
make_slobroks_config(slobroks, slobrok_port);
stor_communicationmanager.useDirectStorageapiRpc = true;
stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
+ stor_communicationmanager.rpc.eventsBeforeWakeup = params.get_rpc_events_before_wakup();
stor_communicationmanager.rpc.numTargetsPerNode = params.get_rpc_targets_per_node();
stor_communicationmanager.mbusport = mbus_port;
stor_communicationmanager.rpcport = rpc_port;
@@ -878,7 +883,8 @@ PersistenceProviderFixture::start_service_layer(const BMParams& params)
_service_layer->getNode().waitUntilInitialized();
LOG(info, "start rpc client shared resources");
config::ConfigUri client_config_uri("bm-rpc-client", _config_context);
- _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100);
+ _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>
+ (client_config_uri, _rpc_client_port, 100, params.get_rpc_events_before_wakup());
_rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client");
wait_slobrok("storage/cluster.storage/storage/0/default");
wait_slobrok("storage/cluster.storage/storage/0");
@@ -1369,6 +1375,7 @@ App::usage()
"[--put-passes put-passes]\n"
"[--update-passes update-passes]\n"
"[--remove-passes remove-passes]\n"
+ "[--rpc-events-before-wakeup events]\n"
"[--rpc-network-threads threads]\n"
"[--rpc-targets-per-node targets]\n"
"[--response-threads threads]\n"
@@ -1399,6 +1406,7 @@ App::get_options()
{ "put-passes", 1, nullptr, 0 },
{ "remove-passes", 1, nullptr, 0 },
{ "response-threads", 1, nullptr, 0 },
+ { "rpc-events-before-wakeup", 1, nullptr, 0 },
{ "rpc-network-threads", 1, nullptr, 0 },
{ "rpc-targets-per-node", 1, nullptr, 0 },
{ "skip-get-spi-bucket-info", 0, nullptr, 0 },
@@ -1420,6 +1428,7 @@ App::get_options()
LONGOPT_PUT_PASSES,
LONGOPT_REMOVE_PASSES,
LONGOPT_RESPONSE_THREADS,
+ LONGOPT_RPC_EVENTS_BEFORE_WAKEUP,
LONGOPT_RPC_NETWORK_THREADS,
LONGOPT_RPC_TARGETS_PER_NODE,
LONGOPT_SKIP_GET_SPI_BUCKET_INFO,
@@ -1471,6 +1480,9 @@ App::get_options()
case LONGOPT_RESPONSE_THREADS:
_bm_params.set_response_threads(atoi(opt_argument));
break;
+ case LONGOPT_RPC_EVENTS_BEFORE_WAKEUP:
+ _bm_params.set_rpc_events_before_wakeup(atoi(opt_argument));
+ break;
case LONGOPT_RPC_NETWORK_THREADS:
_bm_params.set_rpc_network_threads(atoi(opt_argument));
break;
diff --git a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
index 8b009e02f28..a39ee819f64 100644
--- a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
@@ -57,7 +57,7 @@ struct FixtureBase {
config.getConfig("stor-server").set("node_index", "1");
addSlobrokConfig(config, slobrok);
- shared_rpc_resources = std::make_unique<SharedRpcResources>(config.getConfigId(), 0, 1);
+ shared_rpc_resources = std::make_unique<SharedRpcResources>(config.getConfigId(), 0, 1, 1);
cc_service = std::make_unique<ClusterControllerApiRpcService>(dispatcher, *shared_rpc_resources);
shared_rpc_resources->start_server_and_register_slobrok("my_cool_rpc_test");
}
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index d1cdd649787..0b33da39c41 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -119,7 +119,7 @@ public:
cfg.set("is_distributor", is_distributor ? "true" : "false");
addSlobrokConfig(_config, slobrok);
- _shared_rpc_resources = std::make_unique<SharedRpcResources>(_config.getConfigId(), 0, 1);
+ _shared_rpc_resources = std::make_unique<SharedRpcResources>(_config.getConfigId(), 0, 1, 1);
// TODO make codec provider into interface so we can test decode-failures more easily?
_codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo);
}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 8f24646367f..d674ee96aa3 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -63,6 +63,9 @@ use_direct_storageapi_rpc bool default=false
## The number of network (FNET) threads used by the shared rpc resource.
rpc.num_network_threads int default=2 restart
+## The number of events in the queue of a network (FNET) thread before it is woken up.
+rpc.events_before_wakeup int default=1 restart
+
## The number of (FNET) RPC targets to use per node in the cluster.
##
## The bucket id associated with a message is used to select the RPC target.
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 9d4b7b58a6f..7a3dd16bbd1 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -429,7 +429,8 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
}
_message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo);
- _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, config->rpc.numNetworkThreads);
+ _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport,
+ config->rpc.numNetworkThreads, config->rpc.eventsBeforeWakeup);
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
rpc::StorageApiRpcService::Params rpc_params;
rpc_params.compression_config = convert_to_rpc_compression_config(*config);
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 3aa3d21aa7b..4c075f44d35 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -62,9 +62,11 @@ public:
SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
int rpc_server_port,
- size_t rpc_thread_pool_size)
+ size_t rpc_thread_pool_size,
+ size_t rpc_events_before_wakeup)
: _thread_pool(std::make_unique<FastOS_ThreadPool>(1024*60)),
- _transport(std::make_unique<FNET_Transport>(TransportConfig(rpc_thread_pool_size).events_before_wakeup(1))),
+ _transport(std::make_unique<FNET_Transport>(TransportConfig(rpc_thread_pool_size).
+ events_before_wakeup(rpc_events_before_wakeup))),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))),
_slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))),
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index 740277218c3..1fdd0f2648b 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -30,7 +30,8 @@ class SharedRpcResources {
int _rpc_server_port;
bool _shutdown;
public:
- SharedRpcResources(const config::ConfigUri& config_uri, int rpc_server_port, size_t rpc_thread_pool_size);
+ SharedRpcResources(const config::ConfigUri& config_uri, int rpc_server_port,
+ size_t rpc_thread_pool_size, size_t rpc_events_before_wakeup);
~SharedRpcResources();
FRT_Supervisor& supervisor() noexcept { return *_orb; }