summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-06-29 07:58:08 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-06-29 07:58:08 +0000
commite60ef2ca182ac32903e5c2e379bf6eea9fe77a4a (patch)
tree303d9948b27baf82ead1823d0033a62c1ae66381 /storage
parent1dff254213b5977d1196e0160de82a582a623a72 (diff)
Always skip messenger and communicationmanager when you can.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp24
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h2
2 files changed, 4 insertions, 22 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 975e9361072..5a3ebebcd9e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -49,7 +49,7 @@ void
CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply)
{
assert(reply);
- enqueue_or_process(reply);
+ process(reply);
}
namespace {
@@ -104,7 +104,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTrace(docMsgPtr->steal_trace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr)));
- enqueue_or_process(std::move(cmd));
+ process(std::move(cmd));
} else if (protocolName == mbusprot::StorageProtocol::NAME) {
std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release()));
@@ -116,7 +116,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
cmd->setTrace(storMsgPtr->steal_trace());
cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr)));
- enqueue_or_process(std::move(cmd));
+ process(std::move(cmd));
} else {
LOGBM(warning, "Received unsupported message type %d for protocol '%s'",
msg->getType(), msg->getProtocol().c_str());
@@ -268,8 +268,7 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
_configUri(configUri),
_closed(false),
_docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()),
- _thread(),
- _skip_thread(false)
+ _thread()
{
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
_component.registerMetric(_metrics);
@@ -372,7 +371,6 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig
void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config)
{
// Only allow dynamic (live) reconfiguration of message bus limits.
- _skip_thread = config->skipThread;
if (_mbus) {
configureMessageBusLimits(*config);
if (_mbus->getRPCNetwork().getPort() != config->mbusport) {
@@ -411,8 +409,6 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str());
params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level,
90, config->mbus.compress.limit));
- params.setSkipRequestThread(config->mbus.skipRequestThread);
- params.setSkipReplyThread(config->mbus.skipReplyThread);
// Configure messagebus here as we for legacy reasons have
// config here.
@@ -472,18 +468,6 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
}
}
-void
-CommunicationManager::enqueue_or_process(std::shared_ptr<api::StorageMessage> msg)
-{
- assert(msg);
- if (_skip_thread.load(std::memory_order_relaxed)) {
- LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- process(msg);
- } else {
- dispatch_async(std::move(msg));
- }
-}
-
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
process(std::move(msg));
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 31c6fa00f0e..d52fb56cf20 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -121,10 +121,8 @@ private:
std::atomic<bool> _closed;
DocumentApiConverter _docApiConverter;
framework::Thread::UP _thread;
- std::atomic<bool> _skip_thread;
void updateMetrics(const MetricLockGuard &) override;
- void enqueue_or_process(std::shared_ptr<api::StorageMessage> msg);
// Test needs access to configure() for live reconfig testing.
friend struct CommunicationManagerTest;