diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-12-07 12:46:45 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-12-07 12:46:45 +0000 |
commit | df32bed4cef955e98f8245583331b01d9d1aba13 (patch) | |
tree | aa44623d38fd1526444a0880c85fd85d4415a05b | |
parent | c8b8988ee986fa0a08274aa8ef12b7fbddd90488 (diff) |
Annotate config requiring restart, and properly handle the ones that do not
3 files changed, 21 insertions, 19 deletions
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index e075f57514b..8f24646367f 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -18,39 +18,39 @@ mbus_distributor_node_max_pending_size int default=0 mbus_content_node_max_pending_size int default=0 # Minimum size of packets to compress (0 means no compression) -mbus.compress.limit int default=1024 +mbus.compress.limit int default=1024 restart ## Compression level for packets -mbus.compress.level int default=3 +mbus.compress.level int default=3 restart ## Compression type for packets. -mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 +mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 restart ## TTL for rpc target cache -mbus.rpctargetcache.ttl double default = 600 +mbus.rpctargetcache.ttl double default = 600 restart ## Number of threads for mbus threadpool ## Any value below 1 will be 1. -mbus.num_threads int default=4 +mbus.num_threads int default=4 restart mbus.optimize_for enum {LATENCY, THROUGHPUT, ADAPTIVE} default = LATENCY ## Enable to use above thread pool for encoding replies ## False will use network(fnet) thread -mbus.dispatch_on_encode bool default=true +mbus.dispatch_on_encode bool default=true restart ## Enable to use above thread pool for decoding replies ## False will use network(fnet) thread ## Todo: Change default once verified in large scale deployment. -mbus.dispatch_on_decode bool default=false +mbus.dispatch_on_decode bool default=false restart ## Skip messenger thread on reply ## Experimental -mbus.skip_reply_thread bool default=false +mbus.skip_reply_thread bool default=false restart ## Skip messenger thread on reply ## Experimental -mbus.skip_request_thread bool default=false +mbus.skip_request_thread bool default=false restart ## Skip communication manager thread on mbus requests ## Experimental @@ -61,7 +61,7 @@ skip_thread bool default=false use_direct_storageapi_rpc bool default=false ## The number of network (FNET) threads used by the shared rpc resource. -rpc.num_network_threads int default=2 +rpc.num_network_threads int default=2 restart ## The number of (FNET) RPC targets to use per node in the cluster. ## @@ -70,13 +70,13 @@ rpc.num_network_threads int default=2 ## and the RPC target itself handles sequencing of these messages. ## NB !! It is vital that this number is kept in sync with stor-filestor:num_network_threads. ## Only skilled vespa core developers should touch this. -rpc.num_targets_per_node int default=1 +rpc.num_targets_per_node int default=1 restart # Minimum size of packets to compress (0 means no compression) -rpc.compress.limit int default=1024 +rpc.compress.limit int default=1024 restart ## Compression level for packets -rpc.compress.level int default=3 +rpc.compress.level int default=3 restart ## Compression type for packets. -rpc.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 +rpc.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 restart diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index e05b502f856..9d4b7b58a6f 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -375,6 +375,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> { // Only allow dynamic (live) reconfiguration of message bus limits. _skip_thread = config->skipThread; + _use_direct_storageapi_rpc = config->useDirectStorageapiRpc; if (_mbus) { configureMessageBusLimits(*config); if (_mbus->getRPCNetwork().getPort() != config->mbusport) { @@ -427,7 +428,6 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> configureMessageBusLimits(*config); } - _use_direct_storageapi_rpc = config->useDirectStorageapiRpc; _message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo); _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, config->rpc.numNetworkThreads); _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); @@ -476,7 +476,7 @@ void CommunicationManager::enqueue_or_process(std::shared_ptr<api::StorageMessage> msg) { assert(msg); - if (_skip_thread) { + if (_skip_thread.load(std::memory_order_relaxed)) { LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); process(msg); } else { @@ -573,7 +573,9 @@ CommunicationManager::sendCommand( case api::StorageMessageAddress::Protocol::STORAGE: { LOG(debug, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str()); - if (_use_direct_storageapi_rpc && _storage_api_rpc_service->target_supports_direct_rpc(address)) { + if (_use_direct_storageapi_rpc.load(std::memory_order_relaxed) && + _storage_api_rpc_service->target_supports_direct_rpc(address)) + { _storage_api_rpc_service->send_rpc_v1_request(msg); } else { auto cmd = std::make_unique<mbusprot::StorageCommand>(msg); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 7227f1d7e5b..db88f95af6d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -118,8 +118,8 @@ private: std::atomic<bool> _closed; DocumentApiConverter _docApiConverter; framework::Thread::UP _thread; - bool _skip_thread; - bool _use_direct_storageapi_rpc; + std::atomic<bool> _skip_thread; + std::atomic<bool> _use_direct_storageapi_rpc; void updateMetrics(const MetricLockGuard &) override; void enqueue_or_process(std::shared_ptr<api::StorageMessage> msg); |