diff options
3 files changed, 35 insertions, 23 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index 3e08a6d3901..ba5e0c3c116 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -6,6 +6,7 @@ #include <vespa/messagebus/rpcmessagebus.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> +#include <vespa/storage/persistence/messages.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> @@ -331,4 +332,11 @@ TEST_F(CommunicationManagerTest, unmapped_bucket_space_for_get_documentapi_reque EXPECT_EQ(uint64_t(1), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); } +TEST_F(CommunicationManagerTest, communication_manager_swallows_internal_replies) { + CommunicationManagerFixture f; + auto msg = std::make_unique<RecheckBucketInfoCommand>(makeDocumentBucket({16, 1})); + auto reply = std::shared_ptr<api::StorageReply>(msg->makeReply()); + EXPECT_TRUE(f.comm_mgr->onUp(reply)); // true == handled by storage link +} + } // storage diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index b94ef03fad5..fe753d9c350 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -9,8 +9,6 @@ #include <vespa/log/bufferedlogger.h> LOG_SETUP(".application.link"); -using std::shared_ptr; -using std::ostringstream; using namespace storage::api; namespace storage { @@ -46,14 +44,16 @@ void StorageLink::open() assert(false); } link->_state = OPENED; - if (link->_down.get() == 0) break; + if (!link->_down) { + break; + } link = link->_down.get(); } // When give all links an onOpen call, bottoms up. Do it bottoms up, as // links are more likely to send messages down in their onOpen() call // than up. Thus, chances are best that the component is ready to // receive messages sent during onOpen(). - while (link != 0) { + while (link != nullptr) { link->onOpen(); link = link->_up; } @@ -64,7 +64,9 @@ void StorageLink::doneInit() StorageLink* link = this; while (true) { link->onDoneInit(); - if (link->_down.get() == 0) break; + if (!link->_down) { + break; + } link = link->_down.get(); } } @@ -79,7 +81,7 @@ void StorageLink::close() } void StorageLink::closeNextLink() { - _down.reset(0); + _down.reset(); } void StorageLink::flush() @@ -114,7 +116,7 @@ void StorageLink::flush() void StorageLink::sendDown(const StorageMessage::SP& msg) { - // Verify acceptable state to send messages down + // Verify acceptable state to send messages down switch(getState()) { case OPENED: case CLOSING: @@ -129,20 +131,20 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); - ostringstream ost; + std::ostringstream ost; ost << "Unhandled message at bottom of chain " << *msg << " (message type " << msg->getType().getName() << "). " << vespalib::getStackTrace(0); if (!msg->getType().isReply()) { LOGBP(warning, "%s", ost.str().c_str()); - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); + auto& cmd = dynamic_cast<StorageCommand&>(*msg); + std::shared_ptr<StorageReply> reply(cmd.makeReply()); - if (reply.get()) { + if (reply) { reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendUp(reply); } } else { - ost << " Return code: " << static_cast<const StorageReply&>(*msg).getResult(); + ost << " Return code: " << dynamic_cast<const StorageReply&>(*msg).getResult(); LOGBP(warning, "%s", ost.str().c_str()); } } else if (!_down->onDown(msg)) { @@ -153,7 +155,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) } } -void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) +void StorageLink::sendUp(const std::shared_ptr<StorageMessage> & msg) { // Verify acceptable state to send messages up switch(getState()) { @@ -169,20 +171,20 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) } assert(msg); if (isTop()) { - ostringstream ost; + std::ostringstream ost; ost << "Unhandled message at top of chain " << *msg << "."; ost << vespalib::getStackTrace(0); if (!msg->getType().isReply()) { LOGBP(warning, "%s", ost.str().c_str()); - auto& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); + auto& cmd = dynamic_cast<StorageCommand&>(*msg); + std::shared_ptr<StorageReply> reply(cmd.makeReply()); if (reply.get()) { reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendDown(reply); } } else { - ost << " Return code: " << static_cast<const StorageReply&>(*msg).getResult(); + ost << " Return code: " << dynamic_cast<const StorageReply&>(*msg).getResult(); LOGBP(warning, "%s", ost.str().c_str()); } } else if (!_up->onUp(msg)) { @@ -195,7 +197,7 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const { if (!isTop()) out << ", not top"; out << ")"; const StorageLink* lastlink = _up; - for (const StorageLink* link = this; link != 0; link = link->_down.get()) { + for (const StorageLink* link = this; link != nullptr; link = link->_down.get()) { out << "\n"; link->print(out, false, indent + " "); if (link->_up != lastlink) out << ", broken linkage"; @@ -203,12 +205,12 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const { } } -bool StorageLink::onDown(const shared_ptr<StorageMessage> & msg) +bool StorageLink::onDown(const std::shared_ptr<StorageMessage> & msg) { return msg->callHandler(*this, msg); } -bool StorageLink::onUp(const shared_ptr<StorageMessage> & msg) +bool StorageLink::onUp(const std::shared_ptr<StorageMessage> & msg) { return msg->callHandler(*this, msg); } @@ -236,8 +238,7 @@ StorageLink::stateToString(State state) case CLOSED: return "CLOSED"; default: - assert(false); - return 0; + abort(); } } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index c09908e93c7..9b67830b3dc 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -737,7 +737,10 @@ CommunicationManager::sendReply( if (!context) { LOG(spam, "No transport context in reply %s", reply->toString().c_str()); - return false; + // If it's an autogenerated reply for an internal message type, just throw it away + // by returning that we've handled it. No one else will handle the reply, the + // alternative is that it ends up as warning noise in the log. + return (reply->getType().getId() == api::MessageType::INTERNAL_REPLY_ID); } framework::MilliSecTimer startTime(_component.getClock()); |