diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-05 19:27:10 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-05 19:27:10 +0200 |
commit | a4132d1d8bc228bad9de8e986a777fa1e9e6939d (patch) | |
tree | 28d19408c5a2f2a3ce976d55ac4de5e33580a6d3 /storage | |
parent | 14443fdcab31276ae11684981bf4bb055e3bffdc (diff) | |
parent | da7e22058e25ff752090981209203b281633e5f8 (diff) |
Merge branch 'master' into balder/move-sequenced-task-executors-to-staging_vespalib
Diffstat (limited to 'storage')
5 files changed, 19 insertions, 32 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index 8444319b395..6657a9f1600 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -158,7 +158,6 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); - storage.open(); // Message dequeing does not start before we invoke `open` on the storage // link chain, so we enqueue messages in randomized priority order before @@ -169,6 +168,7 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)); } + storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); for (size_t i = 0; i < pris.size(); ++i) { @@ -191,12 +191,12 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); - storage.open(); std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128}; for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)->makeReply()); } + storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); // Want FIFO order for replies, not priority-sorted order. diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index a593cc913a8..431c90b27f2 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -123,9 +123,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) default: LOG(error, "Link %s trying to send %s down while in state %s", toString().c_str(), msg->toString().c_str(), stateToString(getState())); - return; + assert(false); } - assert(msg); + assert(msg.get()); LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); @@ -165,9 +165,9 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) default: LOG(error, "Link %s trying to send %s up while in state %s", toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); - return; + assert(false); } - assert(msg); + assert(msg.get()); if (isTop()) { ostringstream ost; ost << "Unhandled message at top of chain " << *msg << "."; diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 52dce733321..c855a4e683a 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -29,15 +29,11 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 -## Number of threads for network. +## Number of threads for mbus threadpool ## Any value below 1 will be 1. -mbus.num_network_threads int default=2 +mbus.num_threads int default=4 -## Number of workers threads for messagebus. -## Any value below 1 will be 1. -mbus.num_threads int default=1 - -mbus.optimize_for enum {LATENCY, THROUGHPUT, ADAPTIVE} default = THROUGHPUT +mbus.optimize_for enum {LATENCY, THROUGHPUT, ADAPTIVE} default = LATENCY ## Enable to use above thread pool for encoding replies ## False will use network(fnet) thread diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 19c157ffbd2..fa2b0cda018 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -14,16 +14,17 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/bufferedlogger.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/documentapi/messagebus/messages/getdocumentreply.h> + LOG_SETUP(".communication.manager"); using vespalib::make_string; using document::FixedBucketSpaces; -using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; namespace storage { @@ -280,17 +281,6 @@ struct PlaceHolderBucketResolver : public BucketResolver { } }; -mbus::RPCNetworkParams::OptimizeFor -convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) { - switch (optimizeFor) { - case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY: - return mbus::RPCNetworkParams::OptimizeFor::LATENCY; - case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT: - default: - return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT; - } -} - } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) @@ -300,6 +290,7 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co _listener(), _eventQueue(), _mbus(), + _count(0), _configUri(configUri), _closed(false), _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()) @@ -422,10 +413,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> mbus::RPCNetworkParams params(_configUri); params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); params.setNumThreads(std::max(1, config->mbus.numThreads)); - params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads)); params.setDispatchOnDecode(config->mbus.dispatchOnDecode); params.setDispatchOnEncode(config->mbus.dispatchOnEncode); - params.setOptimizeFor(convert(config->mbus.optimizeFor)); + params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY); params.setIdentity(mbus::Identity(_component.getIdentity())); if (config->mbusport != -1) { @@ -490,8 +480,8 @@ void CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg) { assert(msg); - LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - process(msg); + LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + _eventQueue.enqueue(std::move(msg)); } bool diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 8983dbdf057..c08ad214768 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -116,7 +116,7 @@ private: void process(const std::shared_ptr<api::StorageMessage>& msg); - using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; + using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig; using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); @@ -133,6 +133,7 @@ private: std::unique_ptr<mbus::RPCMessageBus> _mbus; std::unique_ptr<mbus::DestinationSession> _messageBusSession; std::unique_ptr<mbus::SourceSession> _sourceSession; + uint32_t _count; vespalib::Lock _messageBusSentLock; std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent; |