summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-05 19:27:10 +0200
committerGitHub <noreply@github.com>2020-04-05 19:27:10 +0200
commita4132d1d8bc228bad9de8e986a777fa1e9e6939d (patch)
tree28d19408c5a2f2a3ce976d55ac4de5e33580a6d3 /storage
parent14443fdcab31276ae11684981bf4bb055e3bffdc (diff)
parentda7e22058e25ff752090981209203b281633e5f8 (diff)
Merge branch 'master' into balder/move-sequenced-task-executors-to-staging_vespalib
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp4
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp8
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def10
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp26
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h3
5 files changed, 19 insertions, 32 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index 8444319b395..6657a9f1600 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -158,7 +158,6 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
storConfig.getConfigId());
DummyStorageLink *storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
- storage.open();
// Message dequeing does not start before we invoke `open` on the storage
// link chain, so we enqueue messages in randomized priority order before
@@ -169,6 +168,7 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
for (auto pri : pris) {
storage.enqueue(createDummyCommand(pri));
}
+ storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
for (size_t i = 0; i < pris.size(); ++i) {
@@ -191,12 +191,12 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) {
storConfig.getConfigId());
DummyStorageLink *storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
- storage.open();
std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128};
for (auto pri : pris) {
storage.enqueue(createDummyCommand(pri)->makeReply());
}
+ storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
// Want FIFO order for replies, not priority-sorted order.
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index a593cc913a8..431c90b27f2 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -123,9 +123,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
default:
LOG(error, "Link %s trying to send %s down while in state %s",
toString().c_str(), msg->toString().c_str(), stateToString(getState()));
- return;
+ assert(false);
}
- assert(msg);
+ assert(msg.get());
LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str());
if (isBottom()) {
LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str());
@@ -165,9 +165,9 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg)
default:
LOG(error, "Link %s trying to send %s up while in state %s",
toString().c_str(), msg->toString(true).c_str(), stateToString(getState()));
- return;
+ assert(false);
}
- assert(msg);
+ assert(msg.get());
if (isTop()) {
ostringstream ost;
ost << "Unhandled message at top of chain " << *msg << ".";
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 52dce733321..c855a4e683a 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -29,15 +29,11 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
## TTL for rpc target cache
mbus.rpctargetcache.ttl double default = 600
-## Number of threads for network.
+## Number of threads for mbus threadpool
## Any value below 1 will be 1.
-mbus.num_network_threads int default=2
+mbus.num_threads int default=4
-## Number of workers threads for messagebus.
-## Any value below 1 will be 1.
-mbus.num_threads int default=1
-
-mbus.optimize_for enum {LATENCY, THROUGHPUT, ADAPTIVE} default = THROUGHPUT
+mbus.optimize_for enum {LATENCY, THROUGHPUT, ADAPTIVE} default = LATENCY
## Enable to use above thread pool for encoding replies
## False will use network(fnet) thread
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 19c157ffbd2..fa2b0cda018 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -14,16 +14,17 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/log/bufferedlogger.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/documentapi/messagebus/messages/getdocumentreply.h>
+
LOG_SETUP(".communication.manager");
using vespalib::make_string;
using document::FixedBucketSpaces;
-using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
namespace storage {
@@ -280,17 +281,6 @@ struct PlaceHolderBucketResolver : public BucketResolver {
}
};
-mbus::RPCNetworkParams::OptimizeFor
-convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) {
- switch (optimizeFor) {
- case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY:
- return mbus::RPCNetworkParams::OptimizeFor::LATENCY;
- case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT:
- default:
- return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT;
- }
-}
-
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
@@ -300,6 +290,7 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
_listener(),
_eventQueue(),
_mbus(),
+ _count(0),
_configUri(configUri),
_closed(false),
_docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>())
@@ -422,10 +413,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
mbus::RPCNetworkParams params(_configUri);
params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl);
params.setNumThreads(std::max(1, config->mbus.numThreads));
- params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
- params.setOptimizeFor(convert(config->mbus.optimizeFor));
+ params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY);
params.setIdentity(mbus::Identity(_component.getIdentity()));
if (config->mbusport != -1) {
@@ -490,8 +480,8 @@ void
CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg)
{
assert(msg);
- LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- process(msg);
+ LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
+ _eventQueue.enqueue(std::move(msg));
}
bool
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 8983dbdf057..c08ad214768 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -116,7 +116,7 @@ private:
void process(const std::shared_ptr<api::StorageMessage>& msg);
- using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
+ using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig;
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
void configureMessageBusLimits(const CommunicationManagerConfig& cfg);
@@ -133,6 +133,7 @@ private:
std::unique_ptr<mbus::RPCMessageBus> _mbus;
std::unique_ptr<mbus::DestinationSession> _messageBusSession;
std::unique_ptr<mbus::SourceSession> _sourceSession;
+ uint32_t _count;
vespalib::Lock _messageBusSentLock;
std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent;