diff options
12 files changed, 69 insertions, 68 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp index e6502d14347..714add169fd 100644 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp @@ -23,7 +23,8 @@ DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessage : IBmFeedHandler(), _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")), _storage_address(std::make_unique<StorageMessageAddress>("storage", NodeType::DISTRIBUTOR, 0)), - _message_bus(message_bus) + _message_bus(message_bus), + _route(_storage_address->to_mbus_route()) { } @@ -32,7 +33,7 @@ DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = defa void DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker) { - _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); + _message_bus.send_msg(std::move(msg), _route, pending_tracker); } void diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h index 42bc61e587e..52e0b89007f 100644 --- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h @@ -3,6 +3,7 @@ #pragma once #include "i_bm_feed_handler.h" +#include <vespa/messagebus/routing/route.h> namespace document { class DocumentTypeRepo; } namespace documentapi { class DocumentMessage; }; @@ -21,6 +22,7 @@ class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler vespalib::string _name; std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; BmMessageBus& _message_bus; + mbus::Route _route; void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker); public: DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus); diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp index 731b90888ea..18c1b979895 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp @@ -22,7 +22,8 @@ StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBu _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), - _message_bus(message_bus) + _message_bus(message_bus), + _route(_storage_address->to_mbus_route()) { } @@ -33,7 +34,7 @@ StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::Storag { cmd->setSourceIndex(0); auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd); - _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); + _message_bus.send_msg(std::move(msg), _route, pending_tracker); } void diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h index 82447e1e873..6925053ad43 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h @@ -3,6 +3,7 @@ #pragma once #include "i_bm_feed_handler.h" +#include <vespa/messagebus/routing/route.h> namespace document { class DocumentTypeRepo; } namespace documentapi { class DocumentMessage; }; @@ -25,6 +26,7 @@ class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler bool _distributor; std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; BmMessageBus& _message_bus; + mbus::Route _route; void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); public: StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 33f50b7934e..c296f215c8c 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -584,7 +584,7 @@ CommunicationManager::sendCommand( cmd->setRetryEnabled(false); cmd->setTimeRemaining(msg->getTimeout()); cmd->setTrace(msg->steal_trace()); - sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); + sendMessageBusMessage(msg, std::move(cmd), address.to_mbus_route()); } break; } @@ -603,7 +603,7 @@ CommunicationManager::sendCommand( std::lock_guard lock(_messageBusSentLock); _messageBusSent[msg->getMsgId()] = msg; } - sendMessageBusMessage(msg, std::move(mbusMsg), address.getRoute()); + sendMessageBusMessage(msg, std::move(mbusMsg), address.to_mbus_route()); break; } else { LOGBM(warning, "This type of message can't be sent via messagebus"); diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index 7cb61e6d0a8..16a9b26a754 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -222,7 +222,7 @@ Visitor::sendMessage(documentapi::DocumentMessage::UP cmd) { assert(cmd.get()); if (!isRunning()) return; - cmd->setRoute(_dataDestination->getRoute()); + cmd->setRoute(*_dataDestination); cmd->setPriority(_documentPriority); @@ -291,7 +291,7 @@ Visitor::sendInfoMessage(documentapi::VisitorInfoMessage::UP cmd) if (!isRunning()) return; if (_controlDestination->toString().length()) { - cmd->setRoute(_controlDestination->getRoute()); + cmd->setRoute(*_controlDestination); cmd->setPriority(_documentPriority); cmd->setTimeRemaining(std::chrono::milliseconds(_visitorInfoTimeout.getTime())); auto& msgMeta = _visitorTarget.insertMessage(std::move(cmd)); @@ -554,8 +554,8 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId, void Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd, - const api::StorageMessageAddress& controlAddress, - const api::StorageMessageAddress& dataAddress, + const mbus::Route& controlAddress, + const mbus::Route& dataAddress, framework::MilliSecTime timeout) { _priority = initiatingCmd->getPriority(); @@ -569,9 +569,8 @@ Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd, _traceLevel = _initiatingCmd->getTrace().getLevel(); { // Set new address - _controlDestination.reset( - new api::StorageMessageAddress(controlAddress)); - _dataDestination.reset(new api::StorageMessageAddress(dataAddress)); + _controlDestination = std::make_unique<mbus::Route>(controlAddress); + _dataDestination = std::make_unique<mbus::Route>(dataAddress); } LOG(debug, "Visitor '%s' has control destination %s and data " "destination %s.", diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 8a1f675a4c5..8eb02e6ccfc 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -330,8 +330,8 @@ protected: documentapi::Priority::Value _documentPriority; std::string _id; - std::unique_ptr<api::StorageMessageAddress> _controlDestination; - std::unique_ptr<api::StorageMessageAddress> _dataDestination; + std::unique_ptr<mbus::Route> _controlDestination; + std::unique_ptr<mbus::Route> _dataDestination; std::shared_ptr<document::select::Node> _documentSelection; std::string _documentSelectionString; vdslib::VisitorStatistics _visitorStatistics; @@ -355,10 +355,12 @@ public: framework::MicroSecTime getStartTime() const { return _startTime; } api::VisitorId getVisitorId() const { return _visitorId; } const std::string& getVisitorName() const { return _id; } - const api::StorageMessageAddress* getControlDestination() const - { return _controlDestination.get(); } // Can't be null if attached - const api::StorageMessageAddress* getDataDestination() const - { return _dataDestination.get(); } // Can't be null if attached + const mbus::Route* getControlDestination() const { + return _controlDestination.get(); // Can't be null if attached + } + const mbus::Route* getDataDestination() const { + return _dataDestination.get(); // Can't be null if attached + } void setMaxPending(unsigned int maxPending) { _visitorOptions._maxPending = maxPending; } @@ -471,8 +473,8 @@ public: documentapi::Priority::Value); void attach(std::shared_ptr<api::StorageCommand> initiatingCmd, - const api::StorageMessageAddress& controlAddress, - const api::StorageMessageAddress& dataAddress, + const mbus::Route& controlAddress, + const mbus::Route& dataAddress, framework::MilliSecTime timeout); void handleDocumentApiReply(mbus::Reply::UP reply, diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index 50f7e76b149..2839d3566aa 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -382,19 +382,18 @@ VisitorThread::createVisitor(vespalib::stringref libName, } namespace { - std::unique_ptr<api::StorageMessageAddress> - getDataAddress(const api::CreateVisitorCommand& cmd) - { - return std::make_unique<api::StorageMessageAddress>( - mbus::Route::parse(cmd.getDataDestination())); - } - std::unique_ptr<api::StorageMessageAddress> - getControlAddress(const api::CreateVisitorCommand& cmd) - { - return std::make_unique<api::StorageMessageAddress>( - mbus::Route::parse(cmd.getControlDestination())); - } +std::unique_ptr<mbus::Route> +getDataAddress(const api::CreateVisitorCommand& cmd) +{ + return std::make_unique<mbus::Route>(mbus::Route::parse(cmd.getDataDestination())); +} + +std::unique_ptr<mbus::Route> +getControlAddress(const api::CreateVisitorCommand& cmd) +{ + return std::make_unique<mbus::Route>(mbus::Route::parse(cmd.getControlDestination())); +} void validateDocumentSelection(const document::DocumentTypeRepo& repo, @@ -424,8 +423,8 @@ VisitorThread::onCreateVisitor( assert(_currentlyRunningVisitor == _visitors.end()); ReturnCode result(ReturnCode::OK); std::unique_ptr<document::select::Node> docSelection; - std::unique_ptr<api::StorageMessageAddress> controlAddress; - std::unique_ptr<api::StorageMessageAddress> dataAddress; + std::unique_ptr<mbus::Route> controlAddress; + std::unique_ptr<mbus::Route> dataAddress; std::shared_ptr<Visitor> visitor; do { // If no buckets are specified, fail command diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index 68eb4a90c9f..e413a62ae39 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -117,7 +117,7 @@ namespace { TEST_F(StorageProtocolTest, testAddress50) { StorageMessageAddress address("foo", lib::NodeType::STORAGE, 3); EXPECT_EQ(vespalib::string("storage/cluster.foo/storage/3/default"), - address.getRoute().toString()); + address.to_mbus_route().toString()); } template<typename Command> std::shared_ptr<Command> diff --git a/storageapi/src/tests/messageapi/storage_message_address_test.cpp b/storageapi/src/tests/messageapi/storage_message_address_test.cpp index 55adcf46c0d..f7f254e9119 100644 --- a/storageapi/src/tests/messageapi/storage_message_address_test.cpp +++ b/storageapi/src/tests/messageapi/storage_message_address_test.cpp @@ -22,17 +22,17 @@ TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) { hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 123), hash_of("foo", lib::NodeType::STORAGE, 123)); + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("bar", lib::NodeType::STORAGE, 0)); // These tests are all true with extremely high probability, though they do // depend on a hash function that may inherently cause collisions. EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), - hash_of("bar", lib::NodeType::STORAGE, 0)); - EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), hash_of("foo", lib::NodeType::STORAGE, 1)); - EXPECT_EQ(112u, sizeof(StorageMessageAddress)); + EXPECT_EQ(88u, sizeof(StorageMessageAddress)); EXPECT_EQ(64u, sizeof(StorageMessage)); EXPECT_EQ(16u, sizeof(mbus::Trace)); } diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index 16f35b8d939..9c5df379d22 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -139,15 +139,6 @@ MessageType::print(std::ostream& out, bool verbose, const std::string& indent) c out << ")"; } -StorageMessageAddress::StorageMessageAddress(const mbus::Route& route) - : _route(route), - _cluster(), - _precomputed_storage_hash(0), - _type(nullptr), - _protocol(DOCUMENT), - _index(0xFFFF) -{ } - std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr) { return os << addr.toString(); } @@ -160,39 +151,45 @@ createAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t i return os.str(); } +size_t +calculate_node_hash(const lib::NodeType& type, uint16_t index) +{ + uint16_t buf[] = { type, index }; + return vespalib::hashValue(&buf, sizeof(buf)); +} + // TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available StorageMessageAddress::StorageMessageAddress() - : _route(), - _cluster(), + : _cluster(), _precomputed_storage_hash(0), _type(nullptr), _protocol(Protocol::STORAGE), _index(0) {} - StorageMessageAddress::StorageMessageAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index, Protocol protocol) - : _route(), - _cluster(cluster), - _precomputed_storage_hash(0), + : _cluster(cluster), + _precomputed_storage_hash(calculate_node_hash(type, index)), _type(&type), _protocol(protocol), _index(index) { - std::vector<mbus::IHopDirective::SP> directives; - auto address_as_str = createAddress(cluster, type, index); - // We reuse the string representation and pass it to vespalib's hashValue instead of - // explicitly combining a running hash over the individual fields. This is because - // hashValue internally uses xxhash, which offers great dispersion of bits even for - // minimal changes in the input (such as single bit differences in the index). - _precomputed_storage_hash = vespalib::hashValue(address_as_str.data(), address_as_str.size()); - directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str))); - _route.addHop(mbus::Hop(std::move(directives), false)); } StorageMessageAddress::~StorageMessageAddress() = default; +mbus::Route +StorageMessageAddress::to_mbus_route() const +{ + mbus::Route result; + auto address_as_str = createAddress(_cluster, *_type, _index); + std::vector<mbus::IHopDirective::SP> directives; + directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str))); + result.addHop(mbus::Hop(std::move(directives), false)); + return result; +} + uint16_t StorageMessageAddress::getIndex() const { @@ -251,7 +248,7 @@ StorageMessageAddress::print(vespalib::asciistream & out) const out << "Document protocol"; } if (!_type) { - out << ", " << _route.toString() << ")"; + out << ", " << to_mbus_route().toString() << ")"; } else { out << ", cluster " << _cluster << ", nodetype " << *_type << ", index " << _index << ")"; diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index a2227246a52..98552e473c1 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -266,7 +266,6 @@ public: enum Protocol { STORAGE, DOCUMENT }; private: - mbus::Route _route; vespalib::string _cluster; // Used for internal VDS addresses only size_t _precomputed_storage_hash; @@ -276,7 +275,6 @@ private: public: StorageMessageAddress(); // Only to be used when transient default ctor semantics are needed by containers - StorageMessageAddress(const mbus::Route& route); StorageMessageAddress(vespalib::stringref clusterName, const lib::NodeType& type, uint16_t index, Protocol protocol = STORAGE); @@ -284,13 +282,13 @@ public: void setProtocol(Protocol p) { _protocol = p; } - const mbus::Route& getRoute() const { return _route; } + mbus::Route to_mbus_route() const; Protocol getProtocol() const { return _protocol; } uint16_t getIndex() const; const lib::NodeType& getNodeType() const; const vespalib::string& getCluster() const; - // Returns precomputed hash over <cluster, type, index> tuple. Other fields not included. + // Returns precomputed hash over <type, index> pair. Other fields not included. [[nodiscard]] size_t internal_storage_hash() const noexcept { return _precomputed_storage_hash; } |