summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-12-07 12:46:45 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-12-07 12:46:45 +0000
commitdf32bed4cef955e98f8245583331b01d9d1aba13 (patch)
treeaa44623d38fd1526444a0880c85fd85d4415a05b /storage/src
parentc8b8988ee986fa0a08274aa8ef12b7fbddd90488 (diff)
Annotate config requiring restart, and properly handle the ones that do not
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def28
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h4
3 files changed, 21 insertions, 19 deletions
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index e075f57514b..8f24646367f 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -18,39 +18,39 @@ mbus_distributor_node_max_pending_size int default=0
mbus_content_node_max_pending_size int default=0
# Minimum size of packets to compress (0 means no compression)
-mbus.compress.limit int default=1024
+mbus.compress.limit int default=1024 restart
## Compression level for packets
-mbus.compress.level int default=3
+mbus.compress.level int default=3 restart
## Compression type for packets.
-mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
+mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 restart
## TTL for rpc target cache
-mbus.rpctargetcache.ttl double default = 600
+mbus.rpctargetcache.ttl double default = 600 restart
## Number of threads for mbus threadpool
## Any value below 1 will be 1.
-mbus.num_threads int default=4
+mbus.num_threads int default=4 restart
mbus.optimize_for enum {LATENCY, THROUGHPUT, ADAPTIVE} default = LATENCY
## Enable to use above thread pool for encoding replies
## False will use network(fnet) thread
-mbus.dispatch_on_encode bool default=true
+mbus.dispatch_on_encode bool default=true restart
## Enable to use above thread pool for decoding replies
## False will use network(fnet) thread
## Todo: Change default once verified in large scale deployment.
-mbus.dispatch_on_decode bool default=false
+mbus.dispatch_on_decode bool default=false restart
## Skip messenger thread on reply
## Experimental
-mbus.skip_reply_thread bool default=false
+mbus.skip_reply_thread bool default=false restart
## Skip messenger thread on reply
## Experimental
-mbus.skip_request_thread bool default=false
+mbus.skip_request_thread bool default=false restart
## Skip communication manager thread on mbus requests
## Experimental
@@ -61,7 +61,7 @@ skip_thread bool default=false
use_direct_storageapi_rpc bool default=false
## The number of network (FNET) threads used by the shared rpc resource.
-rpc.num_network_threads int default=2
+rpc.num_network_threads int default=2 restart
## The number of (FNET) RPC targets to use per node in the cluster.
##
@@ -70,13 +70,13 @@ rpc.num_network_threads int default=2
## and the RPC target itself handles sequencing of these messages.
## NB !! It is vital that this number is kept in sync with stor-filestor:num_network_threads.
## Only skilled vespa core developers should touch this.
-rpc.num_targets_per_node int default=1
+rpc.num_targets_per_node int default=1 restart
# Minimum size of packets to compress (0 means no compression)
-rpc.compress.limit int default=1024
+rpc.compress.limit int default=1024 restart
## Compression level for packets
-rpc.compress.level int default=3
+rpc.compress.level int default=3 restart
## Compression type for packets.
-rpc.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
+rpc.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 restart
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index e05b502f856..9d4b7b58a6f 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -375,6 +375,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
{
// Only allow dynamic (live) reconfiguration of message bus limits.
_skip_thread = config->skipThread;
+ _use_direct_storageapi_rpc = config->useDirectStorageapiRpc;
if (_mbus) {
configureMessageBusLimits(*config);
if (_mbus->getRPCNetwork().getPort() != config->mbusport) {
@@ -427,7 +428,6 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
configureMessageBusLimits(*config);
}
- _use_direct_storageapi_rpc = config->useDirectStorageapiRpc;
_message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo);
_shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, config->rpc.numNetworkThreads);
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
@@ -476,7 +476,7 @@ void
CommunicationManager::enqueue_or_process(std::shared_ptr<api::StorageMessage> msg)
{
assert(msg);
- if (_skip_thread) {
+ if (_skip_thread.load(std::memory_order_relaxed)) {
LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
process(msg);
} else {
@@ -573,7 +573,9 @@ CommunicationManager::sendCommand(
case api::StorageMessageAddress::Protocol::STORAGE:
{
LOG(debug, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str());
- if (_use_direct_storageapi_rpc && _storage_api_rpc_service->target_supports_direct_rpc(address)) {
+ if (_use_direct_storageapi_rpc.load(std::memory_order_relaxed) &&
+ _storage_api_rpc_service->target_supports_direct_rpc(address))
+ {
_storage_api_rpc_service->send_rpc_v1_request(msg);
} else {
auto cmd = std::make_unique<mbusprot::StorageCommand>(msg);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 7227f1d7e5b..db88f95af6d 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -118,8 +118,8 @@ private:
std::atomic<bool> _closed;
DocumentApiConverter _docApiConverter;
framework::Thread::UP _thread;
- bool _skip_thread;
- bool _use_direct_storageapi_rpc;
+ std::atomic<bool> _skip_thread;
+ std::atomic<bool> _use_direct_storageapi_rpc;
void updateMetrics(const MetricLockGuard &) override;
void enqueue_or_process(std::shared_ptr<api::StorageMessage> msg);