diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-02 18:13:20 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-02 18:13:20 +0000 |
commit | a2c8ad475de58e6686e484610488bc4adae30273 (patch) | |
tree | 819806f6444f831a119ea2786bed17c66df0d403 | |
parent | fc202c24fcf4eb4a69a083d3e42022e93c0b8653 (diff) |
Bypass communicationmanager Q
4 files changed, 10 insertions, 14 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index 6657a9f1600..8444319b395 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -158,6 +158,7 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); + storage.open(); // Message dequeing does not start before we invoke `open` on the storage // link chain, so we enqueue messages in randomized priority order before @@ -168,7 +169,6 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)); } - storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); for (size_t i = 0; i < pris.size(); ++i) { @@ -191,12 +191,12 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); + storage.open(); std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128}; for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)->makeReply()); } - storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); // Want FIFO order for replies, not priority-sorted order. diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 431c90b27f2..a593cc913a8 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -123,9 +123,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) default: LOG(error, "Link %s trying to send %s down while in state %s", toString().c_str(), msg->toString().c_str(), stateToString(getState())); - assert(false); + return; } - assert(msg.get()); + assert(msg); LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); @@ -165,9 +165,9 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) default: LOG(error, "Link %s trying to send %s up while in state %s", toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); - assert(false); + return; } - assert(msg.get()); + assert(msg); if (isTop()) { ostringstream ost; ost << "Unhandled message at top of chain " << *msg << "."; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index e7d1f06bbd7..aff2b0f624f 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -14,13 +14,11 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/stringfmt.h> - -#include <vespa/log/bufferedlogger.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/documentapi/messagebus/messages/getdocumentreply.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/log/bufferedlogger.h> LOG_SETUP(".communication.manager"); using vespalib::make_string; @@ -302,7 +300,6 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co _listener(), _eventQueue(), _mbus(), - _count(0), _configUri(configUri), _closed(false), _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()) @@ -492,8 +489,8 @@ void CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg) { assert(msg); - LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - _eventQueue.enqueue(std::move(msg)); + LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + process(msg); } bool diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index a0ae4bf3b43..8983dbdf057 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -133,7 +133,6 @@ private: std::unique_ptr<mbus::RPCMessageBus> _mbus; std::unique_ptr<mbus::DestinationSession> _messageBusSession; std::unique_ptr<mbus::SourceSession> _sourceSession; - uint32_t _count; vespalib::Lock _messageBusSentLock; std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent; |