diff options
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/storageserver/communicationmanager.cpp | 24 | ||||
-rw-r--r-- | storage/src/vespa/storage/storageserver/communicationmanager.h | 2 |
2 files changed, 4 insertions, 22 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 975e9361072..5a3ebebcd9e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -49,7 +49,7 @@ void CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply) { assert(reply); - enqueue_or_process(reply); + process(reply); } namespace { @@ -104,7 +104,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) cmd->setTrace(docMsgPtr->steal_trace()); cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(docMsgPtr))); - enqueue_or_process(std::move(cmd)); + process(std::move(cmd)); } else if (protocolName == mbusprot::StorageProtocol::NAME) { std::unique_ptr<mbusprot::StorageCommand> storMsgPtr(static_cast<mbusprot::StorageCommand*>(msg.release())); @@ -116,7 +116,7 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) cmd->setTrace(storMsgPtr->steal_trace()); cmd->setTransportContext(std::make_unique<StorageTransportContext>(std::move(storMsgPtr))); - enqueue_or_process(std::move(cmd)); + process(std::move(cmd)); } else { LOGBM(warning, "Received unsupported message type %d for protocol '%s'", msg->getType(), msg->getProtocol().c_str()); @@ -268,8 +268,7 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co _configUri(configUri), _closed(false), _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), - _thread(), - _skip_thread(false) + _thread() { _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); _component.registerMetric(_metrics); @@ -372,7 +371,6 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config) { // Only allow dynamic (live) reconfiguration of message bus limits. - _skip_thread = config->skipThread; if (_mbus) { configureMessageBusLimits(*config); if (_mbus->getRPCNetwork().getPort() != config->mbusport) { @@ -411,8 +409,6 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str()); params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level, 90, config->mbus.compress.limit)); - params.setSkipRequestThread(config->mbus.skipRequestThread); - params.setSkipReplyThread(config->mbus.skipReplyThread); // Configure messagebus here as we for legacy reasons have // config here. @@ -472,18 +468,6 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) } } -void -CommunicationManager::enqueue_or_process(std::shared_ptr<api::StorageMessage> msg) -{ - assert(msg); - if (_skip_thread.load(std::memory_order_relaxed)) { - LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - process(msg); - } else { - dispatch_async(std::move(msg)); - } -} - void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); process(std::move(msg)); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 31c6fa00f0e..d52fb56cf20 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -121,10 +121,8 @@ private: std::atomic<bool> _closed; DocumentApiConverter _docApiConverter; framework::Thread::UP _thread; - std::atomic<bool> _skip_thread; void updateMetrics(const MetricLockGuard &) override; - void enqueue_or_process(std::shared_ptr<api::StorageMessage> msg); // Test needs access to configure() for live reconfig testing. friend struct CommunicationManagerTest; |