diff options
author | Geir Storli <geirst@verizonmedia.com> | 2020-11-25 12:56:50 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2020-11-25 12:56:50 +0000 |
commit | c090dda709692c07ca492b698af9db3e1a76d8e1 (patch) | |
tree | e56a9ffc11f4b8d2fc3856faf1bdf7b925ff4196 | |
parent | aa903bfbf23d3b855d50de185a15d8061825c778 (diff) |
Create the mbus::Route on demand instead of storing it inside StorageMessageAddress.
Creating and deleting the route is expensive and not used with RPC for Storage API communication.
9 files changed, 45 insertions, 39 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp index e6502d14347..714add169fd 100644 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp @@ -23,7 +23,8 @@ DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessage : IBmFeedHandler(), _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")), _storage_address(std::make_unique<StorageMessageAddress>("storage", NodeType::DISTRIBUTOR, 0)), - _message_bus(message_bus) + _message_bus(message_bus), + _route(_storage_address->to_mbus_route()) { } @@ -32,7 +33,7 @@ DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = defa void DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker) { - _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); + _message_bus.send_msg(std::move(msg), _route, pending_tracker); } void diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h index 42bc61e587e..52e0b89007f 100644 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h @@ -3,6 +3,7 @@ #pragma once #include "i_bm_feed_handler.h" +#include <vespa/messagebus/routing/route.h> namespace document { class DocumentTypeRepo; } namespace documentapi { class DocumentMessage; }; @@ -21,6 +22,7 @@ class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler vespalib::string _name; std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; BmMessageBus& _message_bus; + mbus::Route _route; void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker); public: DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus); diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp index 731b90888ea..18c1b979895 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp @@ -22,7 +22,8 @@ StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBu _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), - _message_bus(message_bus) + _message_bus(message_bus), + _route(_storage_address->to_mbus_route()) { } @@ -33,7 +34,7 @@ StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::Storag { cmd->setSourceIndex(0); auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd); - _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); + _message_bus.send_msg(std::move(msg), _route, pending_tracker); } void diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h index 82447e1e873..6925053ad43 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h @@ -3,6 +3,7 @@ #pragma once #include "i_bm_feed_handler.h" +#include <vespa/messagebus/routing/route.h> namespace document { class DocumentTypeRepo; } namespace documentapi { class DocumentMessage; }; @@ -25,6 +26,7 @@ class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler bool _distributor; std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; BmMessageBus& _message_bus; + mbus::Route _route; void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); public: StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 940bffdbcc6..09fe0512b6b 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -584,7 +584,7 @@ CommunicationManager::sendCommand( cmd->setRetryEnabled(false); cmd->setTimeRemaining(msg->getTimeout()); cmd->setTrace(msg->steal_trace()); - sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); + sendMessageBusMessage(msg, std::move(cmd), address.to_mbus_route()); } break; } @@ -603,7 +603,7 @@ CommunicationManager::sendCommand( std::lock_guard lock(_messageBusSentLock); _messageBusSent[msg->getMsgId()] = msg; } - sendMessageBusMessage(msg, std::move(mbusMsg), address.getRoute()); + sendMessageBusMessage(msg, std::move(mbusMsg), address.to_mbus_route()); break; } else { LOGBM(warning, "This type of message can't be sent via messagebus"); diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index c2ebffb23e8..0821211936b 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -117,7 +117,7 @@ namespace { TEST_F(StorageProtocolTest, testAddress50) { StorageMessageAddress address("foo", lib::NodeType::STORAGE, 3); EXPECT_EQ(vespalib::string("storage/cluster.foo/storage/3/default"), - address.getRoute().toString()); + address.to_mbus_route().toString()); } template<typename Command> std::shared_ptr<Command> diff --git a/storageapi/src/tests/messageapi/storage_message_address_test.cpp b/storageapi/src/tests/messageapi/storage_message_address_test.cpp index 55adcf46c0d..f7f254e9119 100644 --- a/storageapi/src/tests/messageapi/storage_message_address_test.cpp +++ b/storageapi/src/tests/messageapi/storage_message_address_test.cpp @@ -22,17 +22,17 @@ TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) { hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 123), hash_of("foo", lib::NodeType::STORAGE, 123)); + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("bar", lib::NodeType::STORAGE, 0)); // These tests are all true with extremely high probability, though they do // depend on a hash function that may inherently cause collisions. EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), - hash_of("bar", lib::NodeType::STORAGE, 0)); - EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), hash_of("foo", lib::NodeType::STORAGE, 1)); - EXPECT_EQ(112u, sizeof(StorageMessageAddress)); + EXPECT_EQ(88u, sizeof(StorageMessageAddress)); EXPECT_EQ(64u, sizeof(StorageMessage)); EXPECT_EQ(16u, sizeof(mbus::Trace)); } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index 16f35b8d939..f50c5540f7a 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -139,15 +139,6 @@ MessageType::print(std::ostream& out, bool verbose, const std::string& indent) c out << ")"; } -StorageMessageAddress::StorageMessageAddress(const mbus::Route& route) - : _route(route), - _cluster(), - _precomputed_storage_hash(0), - _type(nullptr), - _protocol(DOCUMENT), - _index(0xFFFF) -{ } - std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr) { return os << addr.toString(); } @@ -160,39 +151,50 @@ createAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t i return os.str(); } +size_t +calculate_node_hash(const lib::NodeType& type, uint16_t index) +{ + struct NodeValue { + uint16_t type; + uint16_t index; + NodeValue(const lib::NodeType& type_in, uint16_t index_in) : type(type_in), index(index_in) {} + }; + NodeValue value(type, index); + return vespalib::hashValue(&value, sizeof(NodeValue)); +} + // TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available StorageMessageAddress::StorageMessageAddress() - : _route(), - _cluster(), + : _cluster(), _precomputed_storage_hash(0), _type(nullptr), _protocol(Protocol::STORAGE), _index(0) {} - StorageMessageAddress::StorageMessageAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index, Protocol protocol) - : _route(), - _cluster(cluster), - _precomputed_storage_hash(0), + : _cluster(cluster), + _precomputed_storage_hash(calculate_node_hash(type, index)), _type(&type), _protocol(protocol), _index(index) { - std::vector<mbus::IHopDirective::SP> directives; - auto address_as_str = createAddress(cluster, type, index); - // We reuse the string representation and pass it to vespalib's hashValue instead of - // explicitly combining a running hash over the individual fields. This is because - // hashValue internally uses xxhash, which offers great dispersion of bits even for - // minimal changes in the input (such as single bit differences in the index). - _precomputed_storage_hash = vespalib::hashValue(address_as_str.data(), address_as_str.size()); - directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str))); - _route.addHop(mbus::Hop(std::move(directives), false)); } StorageMessageAddress::~StorageMessageAddress() = default; +mbus::Route +StorageMessageAddress::to_mbus_route() const +{ + mbus::Route result; + auto address_as_str = createAddress(_cluster, *_type, _index); + std::vector<mbus::IHopDirective::SP> directives; + directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str))); + result.addHop(mbus::Hop(std::move(directives), false)); + return result; +} + uint16_t StorageMessageAddress::getIndex() const { @@ -251,7 +253,7 @@ StorageMessageAddress::print(vespalib::asciistream & out) const out << "Document protocol"; } if (!_type) { - out << ", " << _route.toString() << ")"; + out << ", " << to_mbus_route().toString() << ")"; } else { out << ", cluster " << _cluster << ", nodetype " << *_type << ", index " << _index << ")"; diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index a2227246a52..98552e473c1 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -266,7 +266,6 @@ public: enum Protocol { STORAGE, DOCUMENT }; private: - mbus::Route _route; vespalib::string _cluster; // Used for internal VDS addresses only size_t _precomputed_storage_hash; @@ -276,7 +275,6 @@ private: public: StorageMessageAddress(); // Only to be used when transient default ctor semantics are needed by containers - StorageMessageAddress(const mbus::Route& route); StorageMessageAddress(vespalib::stringref clusterName, const lib::NodeType& type, uint16_t index, Protocol protocol = STORAGE); @@ -284,13 +282,13 @@ public: void setProtocol(Protocol p) { _protocol = p; } - const mbus::Route& getRoute() const { return _route; } + mbus::Route to_mbus_route() const; Protocol getProtocol() const { return _protocol; } uint16_t getIndex() const; const lib::NodeType& getNodeType() const; const vespalib::string& getCluster() const; - // Returns precomputed hash over <cluster, type, index> tuple. Other fields not included. + // Returns precomputed hash over <type, index> pair. Other fields not included. [[nodiscard]] size_t internal_storage_hash() const noexcept { return _precomputed_storage_hash; } |