summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp5
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h2
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp5
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h2
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp4
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp13
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h18
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp27
-rw-r--r--storageapi/src/tests/mbusprot/storageprotocoltest.cpp2
-rw-r--r--storageapi/src/tests/messageapi/storage_message_address_test.cpp6
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp47
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h6
12 files changed, 69 insertions, 68 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
index e6502d14347..714add169fd 100644
--- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
@@ -23,7 +23,8 @@ DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessage
: IBmFeedHandler(),
_name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")),
_storage_address(std::make_unique<StorageMessageAddress>("storage", NodeType::DISTRIBUTOR, 0)),
- _message_bus(message_bus)
+ _message_bus(message_bus),
+ _route(_storage_address->to_mbus_route())
{
}
@@ -32,7 +33,7 @@ DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = defa
void
DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker)
{
- _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker);
+ _message_bus.send_msg(std::move(msg), _route, pending_tracker);
}
void
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
index 42bc61e587e..52e0b89007f 100644
--- a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
@@ -3,6 +3,7 @@
#pragma once
#include "i_bm_feed_handler.h"
+#include <vespa/messagebus/routing/route.h>
namespace document { class DocumentTypeRepo; }
namespace documentapi { class DocumentMessage; };
@@ -21,6 +22,7 @@ class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler
vespalib::string _name;
std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
BmMessageBus& _message_bus;
+ mbus::Route _route;
void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker);
public:
DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus);
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
index 731b90888ea..18c1b979895 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
@@ -22,7 +22,8 @@ StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBu
_name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
_storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
- _message_bus(message_bus)
+ _message_bus(message_bus),
+ _route(_storage_address->to_mbus_route())
{
}
@@ -33,7 +34,7 @@ StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::Storag
{
cmd->setSourceIndex(0);
auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd);
- _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker);
+ _message_bus.send_msg(std::move(msg), _route, pending_tracker);
}
void
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
index 82447e1e873..6925053ad43 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
@@ -3,6 +3,7 @@
#pragma once
#include "i_bm_feed_handler.h"
+#include <vespa/messagebus/routing/route.h>
namespace document { class DocumentTypeRepo; }
namespace documentapi { class DocumentMessage; };
@@ -25,6 +26,7 @@ class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler
bool _distributor;
std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
BmMessageBus& _message_bus;
+ mbus::Route _route;
void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
public:
StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 33f50b7934e..c296f215c8c 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -584,7 +584,7 @@ CommunicationManager::sendCommand(
cmd->setRetryEnabled(false);
cmd->setTimeRemaining(msg->getTimeout());
cmd->setTrace(msg->steal_trace());
- sendMessageBusMessage(msg, std::move(cmd), address.getRoute());
+ sendMessageBusMessage(msg, std::move(cmd), address.to_mbus_route());
}
break;
}
@@ -603,7 +603,7 @@ CommunicationManager::sendCommand(
std::lock_guard lock(_messageBusSentLock);
_messageBusSent[msg->getMsgId()] = msg;
}
- sendMessageBusMessage(msg, std::move(mbusMsg), address.getRoute());
+ sendMessageBusMessage(msg, std::move(mbusMsg), address.to_mbus_route());
break;
} else {
LOGBM(warning, "This type of message can't be sent via messagebus");
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index 7cb61e6d0a8..16a9b26a754 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -222,7 +222,7 @@ Visitor::sendMessage(documentapi::DocumentMessage::UP cmd)
{
assert(cmd.get());
if (!isRunning()) return;
- cmd->setRoute(_dataDestination->getRoute());
+ cmd->setRoute(*_dataDestination);
cmd->setPriority(_documentPriority);
@@ -291,7 +291,7 @@ Visitor::sendInfoMessage(documentapi::VisitorInfoMessage::UP cmd)
if (!isRunning()) return;
if (_controlDestination->toString().length()) {
- cmd->setRoute(_controlDestination->getRoute());
+ cmd->setRoute(*_controlDestination);
cmd->setPriority(_documentPriority);
cmd->setTimeRemaining(std::chrono::milliseconds(_visitorInfoTimeout.getTime()));
auto& msgMeta = _visitorTarget.insertMessage(std::move(cmd));
@@ -554,8 +554,8 @@ Visitor::start(api::VisitorId id, api::StorageMessage::Id cmdId,
void
Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd,
- const api::StorageMessageAddress& controlAddress,
- const api::StorageMessageAddress& dataAddress,
+ const mbus::Route& controlAddress,
+ const mbus::Route& dataAddress,
framework::MilliSecTime timeout)
{
_priority = initiatingCmd->getPriority();
@@ -569,9 +569,8 @@ Visitor::attach(std::shared_ptr<api::StorageCommand> initiatingCmd,
_traceLevel = _initiatingCmd->getTrace().getLevel();
{
// Set new address
- _controlDestination.reset(
- new api::StorageMessageAddress(controlAddress));
- _dataDestination.reset(new api::StorageMessageAddress(dataAddress));
+ _controlDestination = std::make_unique<mbus::Route>(controlAddress);
+ _dataDestination = std::make_unique<mbus::Route>(dataAddress);
}
LOG(debug, "Visitor '%s' has control destination %s and data "
"destination %s.",
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 8a1f675a4c5..8eb02e6ccfc 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -330,8 +330,8 @@ protected:
documentapi::Priority::Value _documentPriority;
std::string _id;
- std::unique_ptr<api::StorageMessageAddress> _controlDestination;
- std::unique_ptr<api::StorageMessageAddress> _dataDestination;
+ std::unique_ptr<mbus::Route> _controlDestination;
+ std::unique_ptr<mbus::Route> _dataDestination;
std::shared_ptr<document::select::Node> _documentSelection;
std::string _documentSelectionString;
vdslib::VisitorStatistics _visitorStatistics;
@@ -355,10 +355,12 @@ public:
framework::MicroSecTime getStartTime() const { return _startTime; }
api::VisitorId getVisitorId() const { return _visitorId; }
const std::string& getVisitorName() const { return _id; }
- const api::StorageMessageAddress* getControlDestination() const
- { return _controlDestination.get(); } // Can't be null if attached
- const api::StorageMessageAddress* getDataDestination() const
- { return _dataDestination.get(); } // Can't be null if attached
+ const mbus::Route* getControlDestination() const {
+ return _controlDestination.get(); // Can't be null if attached
+ }
+ const mbus::Route* getDataDestination() const {
+ return _dataDestination.get(); // Can't be null if attached
+ }
void setMaxPending(unsigned int maxPending)
{ _visitorOptions._maxPending = maxPending; }
@@ -471,8 +473,8 @@ public:
documentapi::Priority::Value);
void attach(std::shared_ptr<api::StorageCommand> initiatingCmd,
- const api::StorageMessageAddress& controlAddress,
- const api::StorageMessageAddress& dataAddress,
+ const mbus::Route& controlAddress,
+ const mbus::Route& dataAddress,
framework::MilliSecTime timeout);
void handleDocumentApiReply(mbus::Reply::UP reply,
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index 50f7e76b149..2839d3566aa 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -382,19 +382,18 @@ VisitorThread::createVisitor(vespalib::stringref libName,
}
namespace {
- std::unique_ptr<api::StorageMessageAddress>
- getDataAddress(const api::CreateVisitorCommand& cmd)
- {
- return std::make_unique<api::StorageMessageAddress>(
- mbus::Route::parse(cmd.getDataDestination()));
- }
- std::unique_ptr<api::StorageMessageAddress>
- getControlAddress(const api::CreateVisitorCommand& cmd)
- {
- return std::make_unique<api::StorageMessageAddress>(
- mbus::Route::parse(cmd.getControlDestination()));
- }
+std::unique_ptr<mbus::Route>
+getDataAddress(const api::CreateVisitorCommand& cmd)
+{
+ return std::make_unique<mbus::Route>(mbus::Route::parse(cmd.getDataDestination()));
+}
+
+std::unique_ptr<mbus::Route>
+getControlAddress(const api::CreateVisitorCommand& cmd)
+{
+ return std::make_unique<mbus::Route>(mbus::Route::parse(cmd.getControlDestination()));
+}
void
validateDocumentSelection(const document::DocumentTypeRepo& repo,
@@ -424,8 +423,8 @@ VisitorThread::onCreateVisitor(
assert(_currentlyRunningVisitor == _visitors.end());
ReturnCode result(ReturnCode::OK);
std::unique_ptr<document::select::Node> docSelection;
- std::unique_ptr<api::StorageMessageAddress> controlAddress;
- std::unique_ptr<api::StorageMessageAddress> dataAddress;
+ std::unique_ptr<mbus::Route> controlAddress;
+ std::unique_ptr<mbus::Route> dataAddress;
std::shared_ptr<Visitor> visitor;
do {
// If no buckets are specified, fail command
diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
index 68eb4a90c9f..e413a62ae39 100644
--- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
+++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp
@@ -117,7 +117,7 @@ namespace {
TEST_F(StorageProtocolTest, testAddress50) {
StorageMessageAddress address("foo", lib::NodeType::STORAGE, 3);
EXPECT_EQ(vespalib::string("storage/cluster.foo/storage/3/default"),
- address.getRoute().toString());
+ address.to_mbus_route().toString());
}
template<typename Command> std::shared_ptr<Command>
diff --git a/storageapi/src/tests/messageapi/storage_message_address_test.cpp b/storageapi/src/tests/messageapi/storage_message_address_test.cpp
index 55adcf46c0d..f7f254e9119 100644
--- a/storageapi/src/tests/messageapi/storage_message_address_test.cpp
+++ b/storageapi/src/tests/messageapi/storage_message_address_test.cpp
@@ -22,17 +22,17 @@ TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) {
hash_of("foo", lib::NodeType::DISTRIBUTOR, 0));
EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 123),
hash_of("foo", lib::NodeType::STORAGE, 123));
+ EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 0),
+ hash_of("bar", lib::NodeType::STORAGE, 0));
// These tests are all true with extremely high probability, though they do
// depend on a hash function that may inherently cause collisions.
EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0),
- hash_of("bar", lib::NodeType::STORAGE, 0));
- EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0),
hash_of("foo", lib::NodeType::DISTRIBUTOR, 0));
EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0),
hash_of("foo", lib::NodeType::STORAGE, 1));
- EXPECT_EQ(112u, sizeof(StorageMessageAddress));
+ EXPECT_EQ(88u, sizeof(StorageMessageAddress));
EXPECT_EQ(64u, sizeof(StorageMessage));
EXPECT_EQ(16u, sizeof(mbus::Trace));
}
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
index 16f35b8d939..9c5df379d22 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -139,15 +139,6 @@ MessageType::print(std::ostream& out, bool verbose, const std::string& indent) c
out << ")";
}
-StorageMessageAddress::StorageMessageAddress(const mbus::Route& route)
- : _route(route),
- _cluster(),
- _precomputed_storage_hash(0),
- _type(nullptr),
- _protocol(DOCUMENT),
- _index(0xFFFF)
-{ }
-
std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr) {
return os << addr.toString();
}
@@ -160,39 +151,45 @@ createAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t i
return os.str();
}
+size_t
+calculate_node_hash(const lib::NodeType& type, uint16_t index)
+{
+ uint16_t buf[] = { type, index };
+ return vespalib::hashValue(&buf, sizeof(buf));
+}
+
// TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available
StorageMessageAddress::StorageMessageAddress()
- : _route(),
- _cluster(),
+ : _cluster(),
_precomputed_storage_hash(0),
_type(nullptr),
_protocol(Protocol::STORAGE),
_index(0)
{}
-
StorageMessageAddress::StorageMessageAddress(vespalib::stringref cluster, const lib::NodeType& type,
uint16_t index, Protocol protocol)
- : _route(),
- _cluster(cluster),
- _precomputed_storage_hash(0),
+ : _cluster(cluster),
+ _precomputed_storage_hash(calculate_node_hash(type, index)),
_type(&type),
_protocol(protocol),
_index(index)
{
- std::vector<mbus::IHopDirective::SP> directives;
- auto address_as_str = createAddress(cluster, type, index);
- // We reuse the string representation and pass it to vespalib's hashValue instead of
- // explicitly combining a running hash over the individual fields. This is because
- // hashValue internally uses xxhash, which offers great dispersion of bits even for
- // minimal changes in the input (such as single bit differences in the index).
- _precomputed_storage_hash = vespalib::hashValue(address_as_str.data(), address_as_str.size());
- directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str)));
- _route.addHop(mbus::Hop(std::move(directives), false));
}
StorageMessageAddress::~StorageMessageAddress() = default;
+mbus::Route
+StorageMessageAddress::to_mbus_route() const
+{
+ mbus::Route result;
+ auto address_as_str = createAddress(_cluster, *_type, _index);
+ std::vector<mbus::IHopDirective::SP> directives;
+ directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str)));
+ result.addHop(mbus::Hop(std::move(directives), false));
+ return result;
+}
+
uint16_t
StorageMessageAddress::getIndex() const
{
@@ -251,7 +248,7 @@ StorageMessageAddress::print(vespalib::asciistream & out) const
out << "Document protocol";
}
if (!_type) {
- out << ", " << _route.toString() << ")";
+ out << ", " << to_mbus_route().toString() << ")";
} else {
out << ", cluster " << _cluster << ", nodetype " << *_type
<< ", index " << _index << ")";
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index a2227246a52..98552e473c1 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -266,7 +266,6 @@ public:
enum Protocol { STORAGE, DOCUMENT };
private:
- mbus::Route _route;
vespalib::string _cluster;
// Used for internal VDS addresses only
size_t _precomputed_storage_hash;
@@ -276,7 +275,6 @@ private:
public:
StorageMessageAddress(); // Only to be used when transient default ctor semantics are needed by containers
- StorageMessageAddress(const mbus::Route& route);
StorageMessageAddress(vespalib::stringref clusterName,
const lib::NodeType& type, uint16_t index,
Protocol protocol = STORAGE);
@@ -284,13 +282,13 @@ public:
void setProtocol(Protocol p) { _protocol = p; }
- const mbus::Route& getRoute() const { return _route; }
+ mbus::Route to_mbus_route() const;
Protocol getProtocol() const { return _protocol; }
uint16_t getIndex() const;
const lib::NodeType& getNodeType() const;
const vespalib::string& getCluster() const;
- // Returns precomputed hash over <cluster, type, index> tuple. Other fields not included.
+ // Returns precomputed hash over <type, index> pair. Other fields not included.
[[nodiscard]] size_t internal_storage_hash() const noexcept {
return _precomputed_storage_hash;
}