aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-02 23:29:18 +0200
committerGitHub <noreply@github.com>2020-04-02 23:29:18 +0200
commit4de936c04e19e372dcdc47f6c4ed1e50a2d7c433 (patch)
treefc9e2a47701be2523c1217dacafdcb3db3e91be5
parenteebaa4851a44f9c6faea921d7a97913e926cec1d (diff)
parenta2c8ad475de58e6686e484610488bc4adae30273 (diff)
Merge pull request #12813 from vespa-engine/balder/bypass-communicationmanager-queue
Bypass communicationmanager Q
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp4
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp11
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h1
4 files changed, 10 insertions, 14 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index 6657a9f1600..8444319b395 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -158,6 +158,7 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
storConfig.getConfigId());
DummyStorageLink *storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
+ storage.open();
// Message dequeing does not start before we invoke `open` on the storage
// link chain, so we enqueue messages in randomized priority order before
@@ -168,7 +169,6 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
for (auto pri : pris) {
storage.enqueue(createDummyCommand(pri));
}
- storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
for (size_t i = 0; i < pris.size(); ++i) {
@@ -191,12 +191,12 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) {
storConfig.getConfigId());
DummyStorageLink *storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
+ storage.open();
std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128};
for (auto pri : pris) {
storage.enqueue(createDummyCommand(pri)->makeReply());
}
- storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
// Want FIFO order for replies, not priority-sorted order.
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index 431c90b27f2..a593cc913a8 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -123,9 +123,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
default:
LOG(error, "Link %s trying to send %s down while in state %s",
toString().c_str(), msg->toString().c_str(), stateToString(getState()));
- assert(false);
+ return;
}
- assert(msg.get());
+ assert(msg);
LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str());
if (isBottom()) {
LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str());
@@ -165,9 +165,9 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
default:
LOG(error, "Link %s trying to send %s up while in state %s",
toString().c_str(), msg->toString(true).c_str(), stateToString(getState()));
- assert(false);
+ return;
}
- assert(msg.get());
+ assert(msg);
if (isTop()) {
ostringstream ost;
ost << "Unhandled message at top of chain " << *msg << ".";
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index e7d1f06bbd7..aff2b0f624f 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -14,13 +14,11 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/stringfmt.h>
-
-#include <vespa/log/bufferedlogger.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/documentapi/messagebus/messages/getdocumentreply.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".communication.manager");
using vespalib::make_string;
@@ -302,7 +300,6 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
_listener(),
_eventQueue(),
_mbus(),
- _count(0),
_configUri(configUri),
_closed(false),
_docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>())
@@ -492,8 +489,8 @@ void
CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg)
{
assert(msg);
- LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- _eventQueue.enqueue(std::move(msg));
+ LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
+ process(msg);
}
bool
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index a0ae4bf3b43..8983dbdf057 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -133,7 +133,6 @@ private:
std::unique_ptr<mbus::RPCMessageBus> _mbus;
std::unique_ptr<mbus::DestinationSession> _messageBusSession;
std::unique_ptr<mbus::SourceSession> _sourceSession;
- uint32_t _count;
vespalib::Lock _messageBusSentLock;
std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent;