From 229bc493e9cc8e38b7307482cc547f14a7644aad Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 8 Jun 2017 14:55:34 +0200 Subject: Whitespace only --- .../storage/storageserver/communicationmanager.cpp | 222 +++++++-------------- .../storage/storageserver/communicationmanager.h | 11 +- 2 files changed, 76 insertions(+), 157 deletions(-) (limited to 'storage') diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 9087482cb42..b4d1f4e4bfb 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -98,8 +98,7 @@ CommunicationManager::getAllocationType(api::StorageMessage& msg) const void -CommunicationManager::receiveStorageReply( - const std::shared_ptr& reply) +CommunicationManager::receiveStorageReply(const std::shared_ptr& reply) { assert(reply.get()); enqueue(reply); @@ -108,8 +107,7 @@ CommunicationManager::receiveStorageReply( namespace { vespalib::string getNodeId(StorageComponent& sc) { vespalib::asciistream ost; - ost << sc.getClusterName() << "/" << sc.getNodeType() - << "/" << sc.getIndex(); + ost << sc.getClusterName() << "/" << sc.getNodeType() << "/" << sc.getIndex(); return ost.str(); } @@ -125,14 +123,10 @@ CommunicationManager::handleMessage(std::unique_ptr msg) // Relaxed load since we're not doing any dependent reads that aren't // already covered by some other form of explicit synchronization. if (_closed.load(std::memory_order_relaxed)) { - LOG(debug, "Not handling command of type %d as we have closed down", - msg->getType()); - MBUS_TRACE(msg->getTrace(), 6, - "Communication manager: Failing message as we are closed"); + LOG(debug, "Not handling command of type %d as we have closed down", msg->getType()); + MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Failing message as we are closed"); std::unique_ptr reply(new mbus::EmptyReply()); - reply->addError(mbus::Error( - documentapi::DocumentProtocol::ERROR_ABORTED, - "Node shutting down")); + reply->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_ABORTED, "Node shutting down")); msg->swapState(*reply); _messageBusSession->reply(std::move(reply)); return; @@ -140,40 +134,33 @@ CommunicationManager::handleMessage(std::unique_ptr msg) const vespalib::string & protocolName = msg->getProtocol(); if (protocolName == documentapi::DocumentProtocol::NAME) { - std::unique_ptr docMsgPtr( - static_cast(msg.release())); + std::unique_ptr docMsgPtr(static_cast(msg.release())); assert(docMsgPtr.get()); std::unique_ptr cmd( - _docApiConverter.toStorageAPI( - static_cast(*docMsgPtr), - _component.getTypeRepo())); + _docApiConverter.toStorageAPI(static_cast(*docMsgPtr), _component.getTypeRepo())); if (!cmd.get()) { - LOGBM(warning, "Unsupported message: StorageApi could not convert " - "message of type %d to a storageapi message", + LOGBM(warning, "Unsupported message: StorageApi could not convert message of type %d to a storageapi message", docMsgPtr->getType()); _metrics.convertToStorageAPIFailures.inc(); return; } cmd->setTrace(docMsgPtr->getTrace()); - cmd->setTransportContext(std::unique_ptr( - new StorageTransportContext(std::move(docMsgPtr)))); + cmd->setTransportContext(std::unique_ptr(new StorageTransportContext(std::move(docMsgPtr)))); enqueue(std::shared_ptr(cmd.release())); } else if (protocolName == mbusprot::StorageProtocol::NAME) { - std::unique_ptr storMsgPtr( - static_cast(msg.release())); + std::unique_ptr storMsgPtr(static_cast(msg.release())); assert(storMsgPtr.get()); const std::shared_ptr & cmd = storMsgPtr->getCommand(); cmd->setTimeout(storMsgPtr->getTimeRemaining()); cmd->setTrace(storMsgPtr->getTrace()); - cmd->setTransportContext(std::unique_ptr( - new StorageTransportContext(std::move(storMsgPtr)))); + cmd->setTransportContext(std::unique_ptr(new StorageTransportContext(std::move(storMsgPtr)))); enqueue(cmd); } else { @@ -185,13 +172,11 @@ CommunicationManager::handleMessage(std::unique_ptr msg) void CommunicationManager::handleReply(std::unique_ptr reply) { - MBUS_TRACE(reply->getTrace(), 4, getNodeId(_component) - + "Communication manager: Received reply from message bus"); + MBUS_TRACE(reply->getTrace(), 4, getNodeId(_component) + "Communication manager: Received reply from message bus"); // Relaxed load since we're not doing any dependent reads that aren't // already covered by some other form of explicit synchronization. if (_closed.load(std::memory_order_relaxed)) { - LOG(debug, "Not handling reply of type %d as we have closed down", - reply->getType()); + LOG(debug, "Not handling reply of type %d as we have closed down", reply->getType()); return; } LOG(spam, "Got reply of type %d, trace is %s", @@ -205,21 +190,17 @@ CommunicationManager::handleReply(std::unique_ptr reply) const vespalib::string& protocolName = message->getProtocol(); if (protocolName == documentapi::DocumentProtocol::NAME) { - convertedReply.reset(static_cast( - message.get())->createReply().release()); + convertedReply.reset(static_cast(message.get())->createReply().release()); } else if (protocolName == mbusprot::StorageProtocol::NAME) { std::shared_ptr repl( - static_cast(message.get()) - ->getCommand()->makeReply().release()); - mbusprot::StorageReply::UP sreply( - new mbusprot::StorageReply(repl)); + static_cast(message.get())->getCommand()->makeReply().release()); + mbusprot::StorageReply::UP sreply(new mbusprot::StorageReply(repl)); if (reply->hasErrors()) { // Convert only the first error since storageapi only // supports one return code. uint32_t mbuscode = reply->getError(0).getCode(); - api::ReturnCode::Result code( - (api::ReturnCode::Result) mbuscode); + api::ReturnCode::Result code((api::ReturnCode::Result) mbuscode); // Encode mbuscode into message not to lose it sreply->getReply()->setResult(storage::api::ReturnCode( code, @@ -232,8 +213,7 @@ CommunicationManager::handleReply(std::unique_ptr reply) } convertedReply.reset(sreply.release()); } else { - LOG(warning, "Received reply of unhandled protocol '%s'", - protocolName.c_str()); + LOG(warning, "Received reply of unhandled protocol '%s'", protocolName.c_str()); return; } @@ -242,8 +222,7 @@ CommunicationManager::handleReply(std::unique_ptr reply) reply.reset(convertedReply.release()); } if (reply->getType() == 0) { - LOG(warning, "Failed to convert empty reply by reflecting on " - "local message copy."); + LOG(warning, "Failed to convert empty reply by reflecting on local message copy."); return; } } @@ -253,19 +232,15 @@ CommunicationManager::handleReply(std::unique_ptr reply) if (protocolName == documentapi::DocumentProtocol::NAME) { std::shared_ptr originalCommand; - { vespalib::LockGuard lock(_messageBusSentLock); - typedef std::map MessageMap; - MessageMap::iterator iter( - _messageBusSent.find(reply->getContext().value.UINT64)); + typedef std::map MessageMap; + MessageMap::iterator iter(_messageBusSent.find(reply->getContext().value.UINT64)); if (iter != _messageBusSent.end()) { originalCommand.swap(iter->second); _messageBusSent.erase(iter); } else { - LOG(warning, "Failed to convert reply - original sent " - "command doesn't exist"); + LOG(warning, "Failed to convert reply - original sent command doesn't exist"); return; } } @@ -280,13 +255,11 @@ CommunicationManager::handleReply(std::unique_ptr reply) receiveStorageReply(sar); } } else if (protocolName == mbusprot::StorageProtocol::NAME) { - mbusprot::StorageReply* sr( - static_cast(reply.get())); + mbusprot::StorageReply* sr(static_cast(reply.get())); sr->getReply()->setTrace(reply->getTrace()); receiveStorageReply(sr->getReply()); } else { - LOGBM(warning, "Received unsupported reply type %d for protocol " - "'%s'.", + LOGBM(warning, "Received unsupported reply type %d for protocol '%s'.", reply->getType(), reply->getProtocol().c_str()); } } @@ -382,8 +355,7 @@ void CommunicationManager::onClose() while (_eventQueue.size() > 0) { assert(_eventQueue.getNext(msg, 0)); if (!msg->getType().isReply()) { - std::shared_ptr reply( - static_cast(*msg).makeReply().release()); + std::shared_ptr reply(static_cast(*msg).makeReply().release()); reply->setResult(code); sendReply(reply); } @@ -424,18 +396,12 @@ void CommunicationManager::configure( // Configure messagebus here as we for legacy reasons have // config here. - _mbus.reset(new mbus::RPCMessageBus( - mbus::ProtocolSet() - .add(mbus::IProtocol::SP( - new documentapi::DocumentProtocol( - *_component.getLoadTypes(), - _component.getTypeRepo()))) - .add(mbus::IProtocol::SP( - new mbusprot::StorageProtocol( - _component.getTypeRepo(), - *_component.getLoadTypes()))), - params, - _configUri)); + _mbus = std::make_unique( + mbus::ProtocolSet() + .add(std::make_shared(*_component.getLoadTypes(), _component.getTypeRepo())) + .add(std::make_shared(_component.getTypeRepo(), *_component.getLoadTypes())), + params, + _configUri); configureMessageBusLimits(*config); } @@ -459,8 +425,7 @@ void CommunicationManager::configure( void CommunicationManager::process(const std::shared_ptr& msg) { - MBUS_TRACE(msg->getTrace(), 9, - "Communication manager: Sending message down chain."); + MBUS_TRACE(msg->getTrace(), 9, "Communication manager: Sending message down chain."); framework::MilliSecTimer startTime(_component.getClock()); try { LOG(spam, "Process: %s", msg->toString().c_str()); @@ -473,11 +438,9 @@ CommunicationManager::process(const std::shared_ptr& msg) _metrics.messageProcessTime[msg->getLoadType()].addValue( startTime.getElapsedTimeAsDouble()); } catch (std::exception& e) { - LOGBP(error, "When running command %s, caught exception %s. " - "Discarding message", + LOGBP(error, "When running command %s, caught exception %s. Discarding message", msg->toString().c_str(), e.what()); - _metrics.exceptionMessageProcessTime[msg->getLoadType()].addValue( - startTime.getElapsedTimeAsDouble()); + _metrics.exceptionMessageProcessTime[msg->getLoadType()].addValue(startTime.getElapsedTimeAsDouble()); } catch (...) { LOG(fatal, "Caught fatal exception in communication manager"); throw; @@ -498,8 +461,7 @@ CommunicationManager::enqueue(const std::shared_ptr & msg) if (token.get()) { msg->setMemoryToken(std::unique_ptr(token.release())); - LOG(spam, "Enq storage message %s, priority %d", - msg->toString().c_str(), msg->getPriority()); + LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); _eventQueue.enqueue(msg); } else { _metrics.failedDueToTooLittleMemory.inc(); @@ -511,10 +473,8 @@ CommunicationManager::enqueue(const std::shared_ptr & msg) api::StorageCommand* cmd(dynamic_cast(msg.get())); if (cmd) { - std::shared_ptr reply( - cmd->makeReply().release()); - reply->setResult(api::ReturnCode( - api::ReturnCode::BUSY, ost.str())); + std::shared_ptr reply(cmd->makeReply().release()); + reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, ost.str())); sendReply(reply); } } @@ -523,20 +483,16 @@ CommunicationManager::enqueue(const std::shared_ptr & msg) bool CommunicationManager::onUp(const std::shared_ptr & msg) { - MBUS_TRACE(msg->getTrace(), 6, - "Communication manager: Sending " + msg->toString()); + MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Sending " + msg->toString()); if (msg->getType().isReply()) { if (static_cast(*msg).getResult().failed()) { LOG(debug, "Request %s failed: %s", - msg->getType().toString().c_str(), - static_cast(*msg) - .getResult().toString().c_str()); + msg->getType().toString().c_str(), + static_cast(*msg).getResult().toString().c_str()); } - return sendReply( - std::static_pointer_cast(msg)); + return sendReply(std::static_pointer_cast(msg)); } else { - return sendCommand( - std::static_pointer_cast(msg)); + return sendCommand(std::static_pointer_cast(msg)); } } @@ -562,12 +518,9 @@ CommunicationManager::sendMessageBusMessage( std::shared_ptr reply(msg->makeReply().release()); if (reply.get()) { if (result.getError().getCode() > mbus::ErrorCode::FATAL_ERROR) { - reply->setResult(api::ReturnCode( - api::ReturnCode::ABORTED, - result.getError().getMessage())); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, result.getError().getMessage())); } else { - reply->setResult(api::ReturnCode( - api::ReturnCode::BUSY, result.getError().getMessage())); + reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, result.getError().getMessage())); } } else { LOG(spam, "Failed to synthesize reply"); @@ -582,18 +535,17 @@ CommunicationManager::sendCommand( const std::shared_ptr & msg) { if (!msg->getAddress()) { - LOGBP(warning, "Got command without address of type %s in " - "CommunicationManager::sendCommand", - msg->getType().getName().c_str()); + LOGBP(warning, "Got command without address of type %s in CommunicationManager::sendCommand", + msg->getType().getName().c_str()); return false; } if (!msg->sourceIndexSet()) { msg->setSourceIndex(_component.getIndex()); } - // Components can not specify what storage node to send to - // without specifying protocol. This is a workaround, such that code - // doesn't have to care whether message is in documentapi or storage - // protocol. + // Components can not specify what storage node to send to + // without specifying protocol. This is a workaround, such that code + // doesn't have to care whether message is in documentapi or storage + // protocol. api::StorageMessageAddress address(*msg->getAddress()); switch (msg->getType().getId()) { case api::MessageType::STATBUCKET_ID: { @@ -609,9 +561,7 @@ CommunicationManager::sendCommand( switch (address.getProtocol()) { case api::StorageMessageAddress::STORAGE: { - LOG(spam, "Send to %s: %s", - address.toString().c_str(), - msg->toString().c_str()); + LOG(spam, "Send to %s: %s", address.toString().c_str(), msg->toString().c_str()); std::unique_ptr cmd(new mbusprot::StorageCommand(msg)); @@ -624,16 +574,12 @@ CommunicationManager::sendCommand( } case api::StorageMessageAddress::DOCUMENT: { - MBUS_TRACE(msg->getTrace(), 7, - "Communication manager: Converting storageapi message to " - "documentapi"); + MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converting storageapi message to documentapi"); - std::unique_ptr mbusMsg( - _docApiConverter.toDocumentAPI(*msg, _component.getTypeRepo())); + std::unique_ptr mbusMsg(_docApiConverter.toDocumentAPI(*msg, _component.getTypeRepo())); if (mbusMsg.get()) { - MBUS_TRACE(msg->getTrace(), 7, - "Communication manager: Converted OK"); + MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converted OK"); mbusMsg->setTrace(msg->getTrace()); mbusMsg->setRetryEnabled(address.retryEnabled()); @@ -665,13 +611,10 @@ CommunicationManager::serializeNodeState( { vespalib::asciistream tmp; if (gns.hasNodeState()) { - gns.getNodeState().serialize( - tmp, "", includeDescription, - includeDiskDescription, useOldFormat); + gns.getNodeState().serialize(tmp, "", includeDescription, includeDiskDescription, useOldFormat); } else { - _component.getStateUpdater().getReportedNodeState()->serialize( - tmp, "", includeDescription, - includeDiskDescription, useOldFormat); + _component.getStateUpdater().getReportedNodeState()->serialize(tmp, "", includeDescription, + includeDiskDescription, useOldFormat); } os << tmp.str(); } @@ -683,17 +626,14 @@ CommunicationManager::sendDirectRPCReply( { std::string requestName(request.getMethodName()); if (requestName == "getnodestate3") { - api::GetNodeStateReply& gns( - static_cast(*reply)); + api::GetNodeStateReply& gns(static_cast(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); request.addReturnString(gns.getNodeInfo().c_str()); - LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", - gns.getNodeInfo().c_str()); + LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", gns.getNodeInfo().c_str()); } else if (requestName == "getnodestate2") { - api::GetNodeStateReply& gns( - static_cast(*reply)); + api::GetNodeStateReply& gns(static_cast(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); @@ -705,13 +645,11 @@ CommunicationManager::sendDirectRPCReply( request.addReturnString(reply->getResult().getMessage().c_str()); if (reply->getType() == api::MessageType::GETNODESTATE_REPLY) { - api::GetNodeStateReply& gns( - static_cast(*reply)); + api::GetNodeStateReply& gns(static_cast(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, false, false, true); request.addReturnString(ns.str().c_str()); - request.addReturnInt(static_cast( - gns.getNodeState().getInitProgress().getValue() * 100)); + request.addReturnInt(static_cast(gns.getNodeState().getInitProgress().getValue() * 100)); } } @@ -731,35 +669,28 @@ CommunicationManager::sendMessageBusReply( // If this was originally documentapi, create a reply now and transfer the // state. if (context._docAPIMsg.get()) { - if (reply->getResult().getResult() - == api::ReturnCode::WRONG_DISTRIBUTION) - { - replyUP.reset(new documentapi::WrongDistributionReply( - reply->getResult().getMessage())); + if (reply->getResult().getResult() == api::ReturnCode::WRONG_DISTRIBUTION) { + replyUP.reset(new documentapi::WrongDistributionReply(reply->getResult().getMessage())); replyUP->swapState(*context._docAPIMsg); replyUP->setTrace(reply->getTrace()); - replyUP->addError(mbus::Error( - documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - reply->getResult().getMessage())); + replyUP->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + reply->getResult().getMessage())); } else { replyUP = context._docAPIMsg->createReply(); replyUP->swapState(*context._docAPIMsg); replyUP->setTrace(reply->getTrace()); - replyUP->setMessage(std::unique_ptr( - context._docAPIMsg.release())); + replyUP->setMessage(std::unique_ptr(context._docAPIMsg.release())); _docApiConverter.transferReplyState(*reply, *replyUP); } } else if (context._storageProtocolMsg.get()) { replyUP.reset(new mbusprot::StorageReply(reply)); if (reply->getResult().getResult() != api::ReturnCode::OK) { - replyUP->addError(mbus::Error(reply->getResult().getResult(), - reply->getResult().getMessage())); + replyUP->addError(mbus::Error(reply->getResult().getResult(), reply->getResult().getMessage())); } replyUP->swapState(*context._storageProtocolMsg); replyUP->setTrace(reply->getTrace()); - replyUP->setMessage(mbus::Message::UP( - context._storageProtocolMsg.release())); + replyUP->setMessage(mbus::Message::UP(context._storageProtocolMsg.release())); } if (replyUP.get() != NULL) { @@ -784,19 +715,13 @@ CommunicationManager::sendReply( // Relaxed load since we're not doing any dependent reads that aren't // already covered by some other form of explicit synchronization. if (_closed.load(std::memory_order_relaxed)) { - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, - "Node is shutting down")); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down")); } - std::unique_ptr context( - static_cast( - reply->getTransportContext().release())); + std::unique_ptr context(static_cast(reply->getTransportContext().release())); if (!context.get()) { - LOG(spam, - "No transport context in reply %s", - reply->toString().c_str()); - + LOG(spam, "No transport context in reply %s", reply->toString().c_str()); return false; } @@ -837,8 +762,7 @@ CommunicationManager::updateMetrics(const MetricLockGuard &) } void -CommunicationManager::print(std::ostream& out, bool verbose, - const std::string& indent) const +CommunicationManager::print(std::ostream& out, bool verbose, const std::string& indent) const { (void) verbose; (void) indent; out << "CommunicationManager"; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 6c8923b4c08..921ca1400fa 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -146,19 +146,14 @@ private: void process(const std::shared_ptr& msg); - using CommunicationManagerConfig - = vespa::config::content::core::StorCommunicationmanagerConfig; + using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); void configure(std::unique_ptr config) override; void receiveStorageReply(const std::shared_ptr&); - void serializeNodeState( - const api::GetNodeStateReply& gns, - std::ostream& os, - bool includeDescription, - bool includeDiskDescription, - bool useOldFormat) const; + void serializeNodeState(const api::GetNodeStateReply& gns, std::ostream& os, bool includeDescription, + bool includeDiskDescription, bool useOldFormat) const; static const uint64_t FORWARDED_MESSAGE = 0; -- cgit v1.2.3