summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-07-09 07:45:01 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-07-09 07:45:01 +0000
commit637de9a2313414da6930f703e2a63eae4637d0e6 (patch)
treee5b16eb1f9bc2bc3f138ad5e8148d3e00613a552 /storage
parent3af0dfbc0f011a70259424d446a8a9f1bd9b994b (diff)
Config control over what treads to skip.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def12
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp39
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h8
3 files changed, 46 insertions, 13 deletions
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index c855a4e683a..3e4b1fd6515 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -43,3 +43,15 @@ mbus.dispatch_on_encode bool default=true
## False will use network(fnet) thread
## Todo: Change default once verified in large scale deployment.
mbus.dispatch_on_decode bool default=false
+
+## Skip messenger thread on reply
+## Experimental
+mbus.skip_reply_thread bool default=false
+
+## Skip messenger thread on reply
+## Experimental
+mbus.skip_request_thread bool default=false
+
+## Skip communication manager thread on mbus requests
+## Experimental
+skip_thread bool default=false
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index fda672e1ee4..5588b6535e2 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -14,16 +14,17 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/log/bufferedlogger.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/documentapi/messagebus/messages/getdocumentreply.h>
+
LOG_SETUP(".communication.manager");
using vespalib::make_string;
using document::FixedBucketSpaces;
-using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
namespace storage {
@@ -44,8 +45,8 @@ StorageTransportContext::~StorageTransportContext() = default;
void
CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply)
{
- assert(reply.get());
- enqueue(reply);
+ assert(reply);
+ optionalEnqueue(reply);
}
namespace {
@@ -100,7 +101,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTrace(docMsgPtr->getTrace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr)));
- enqueue(std::move(cmd));
+ optionalEnqueue(std::move(cmd));
} else if (protocolName == mbusprot::StorageProtocol::NAME) {
std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release()));
@@ -112,7 +113,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTrace(storMsgPtr->getTrace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr)));
- enqueue(std::move(cmd));
+ optionalEnqueue(std::move(cmd));
} else {
LOGBM(warning, "Received unsupported message type %d for protocol '%s'",
msg->getType(), msg->getProtocol().c_str());
@@ -253,7 +254,9 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
_count(0),
_configUri(configUri),
_closed(false),
- _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>())
+ _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()),
+ _thread(),
+ _skip_thread(false)
{
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
_component.registerMetric(_metrics);
@@ -351,6 +354,7 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig
void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config)
{
// Only allow dynamic (live) reconfiguration of message bus limits.
+ _skip_thread = config->skipThread;
if (_mbus) {
configureMessageBusLimits(*config);
if (_mbus->getRPCNetwork().getPort() != config->mbusport) {
@@ -387,6 +391,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str());
params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level,
90, config->mbus.compress.limit));
+ params.setSkipRequestThread(config->mbus.skipRequestThread);
+ params.setSkipReplyThread(config->mbus.skipReplyThread);
+
// Configure messagebus here as we for legacy reasons have
// config here.
_mbus = std::make_unique<mbus::RPCMessageBus>(
@@ -437,11 +444,23 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
}
void
+CommunicationManager::optionalEnqueue(std::shared_ptr<api::StorageMessage> msg)
+{
+ assert(msg);
+ if (_skip_thread) {
+ LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
+ process(msg);
+ } else {
+ enqueue(std::move(msg));
+ }
+}
+
+void
CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg)
{
assert(msg);
- LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- process(msg);
+ LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
+ _eventQueue.enqueue(std::move(msg));
}
bool
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 1d64c8a8911..3fd82f3509d 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -108,12 +108,14 @@ private:
vespalib::Lock _messageBusSentLock;
std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent;
- config::ConfigUri _configUri;
- std::atomic<bool> _closed;
- DocumentApiConverter _docApiConverter;
+ config::ConfigUri _configUri;
+ std::atomic<bool> _closed;
+ DocumentApiConverter _docApiConverter;
framework::Thread::UP _thread;
+ bool _skip_thread;
void updateMetrics(const MetricLockGuard &) override;
+ void optionalEnqueue(std::shared_ptr<api::StorageMessage> msg);
// Test needs access to configure() for live reconfig testing.
friend struct CommunicationManagerTest;