summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-22 12:24:12 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-22 12:24:12 +0000
commit5afff095ad3ba0b0811b5788ebaeb3c84832d7f6 (patch)
tree175a944dd98e9f1ffd961642c12cb75b44ce2610
parent1fa18524f943ae986f6526db01945d41de2f9d6d (diff)
Avoid address stringification in common lookup path
Precompute internal address hash over <cluster, type, index> tuple. No other fields are included in the hash, as this is only used for storage API lookups. Remove automatic resending field from address, as we never use MBus resending functionality in the backend communication protocols.
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp28
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h37
-rw-r--r--storageapi/src/tests/messageapi/CMakeLists.txt5
-rw-r--r--storageapi/src/tests/messageapi/storage_message_address_test.cpp36
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp28
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagemessage.h10
7 files changed, 106 insertions, 42 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 5471d66a864..99fdb97e435 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -581,7 +581,7 @@ CommunicationManager::sendCommand(
auto cmd = std::make_unique<mbusprot::StorageCommand>(msg);
cmd->setContext(mbus::Context(msg->getMsgId()));
- cmd->setRetryEnabled(address.retryEnabled());
+ cmd->setRetryEnabled(false);
cmd->setTimeRemaining(msg->getTimeout());
cmd->setTrace(msg->getTrace());
sendMessageBusMessage(msg, std::move(cmd), address.getRoute());
@@ -597,7 +597,7 @@ CommunicationManager::sendCommand(
if (mbusMsg) {
MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converted OK");
mbusMsg->setTrace(msg->getTrace());
- mbusMsg->setRetryEnabled(address.retryEnabled());
+ mbusMsg->setRetryEnabled(false);
{
vespalib::LockGuard lock(_messageBusSentLock);
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
index 5241ec6f769..6bcb154aed5 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
@@ -33,9 +33,9 @@ CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen) {
+CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint32_t curr_slobrok_gen) {
std::shared_lock lock(_targets_rwmutex);
- auto itr = _targets.find(slobrok_id);
+ auto itr = _targets.find(address);
if ((itr != _targets.end())
&& itr->second->_target->is_valid()
&& (itr->second->_slobrok_gen == curr_slobrok_gen)) {
@@ -45,18 +45,19 @@ CachingRpcTargetResolver::lookup_target(const vespalib::string& slobrok_id, uint
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::consider_update_target(const vespalib::string& slobrok_id,
+CachingRpcTargetResolver::consider_update_target(const api::StorageMessageAddress& address,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
[[maybe_unused]] const UniqueLock& targets_lock) {
// If address has the same spec as the existing target, just reuse it.
- auto itr = _targets.find(slobrok_id);
+ auto itr = _targets.find(address);
if ((itr != _targets.end())
&& (itr->second->_target->is_valid())
&& (itr->second->_spec == connection_spec))
{
LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u",
- slobrok_id.c_str(), connection_spec.c_str(), itr->second->_slobrok_gen, curr_slobrok_gen);
+ address.toString().c_str(), connection_spec.c_str(),
+ itr->second->_slobrok_gen, curr_slobrok_gen);
itr->second->_slobrok_gen = curr_slobrok_gen;
return itr->second;
}
@@ -64,26 +65,27 @@ CachingRpcTargetResolver::consider_update_target(const vespalib::string& slobrok
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::insert_new_target_mapping(const vespalib::string& slobrok_id,
+CachingRpcTargetResolver::insert_new_target_mapping(const api::StorageMessageAddress& address,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
[[maybe_unused]] const UniqueLock& targets_lock) {
auto target = _target_factory.make_target(connection_spec, curr_slobrok_gen); // TODO expensive inside lock?
assert(target);
std::shared_ptr<RpcTarget> rpc_target(std::move(target));
- _targets[slobrok_id] = rpc_target;
- LOG(debug, "Added mapping '%s' -> '%s' at gen %u", slobrok_id.c_str(), connection_spec.c_str(), curr_slobrok_gen);
+ // TODO emplacement (with replace) semantics to avoid need for default constructed K/V
+ _targets[address] = rpc_target;
+ LOG(debug, "Added mapping '%s' -> '%s' at gen %u", address.toString().c_str(),
+ connection_spec.c_str(), curr_slobrok_gen);
return rpc_target;
}
std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) {
- // TODO or map directly from address to target instead of going via stringification? Needs hashing, if so.
- auto slobrok_id = address_to_slobrok_id(address);
const uint32_t curr_slobrok_gen = _slobrok_mirror.updates();
- if (auto result = lookup_target(slobrok_id, curr_slobrok_gen)) {
+ if (auto result = lookup_target(address, curr_slobrok_gen)) {
return result;
}
+ auto slobrok_id = address_to_slobrok_id(address);
auto specs = _slobrok_mirror.lookup(slobrok_id); // FIXME string type mismatch; implicit conv!
if (specs.empty()) {
LOG(debug, "Found no mapping for '%s'", slobrok_id.c_str());
@@ -95,10 +97,10 @@ CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& a
assert(specs.size() == 1);
const auto& connection_spec = specs[0].second;
std::unique_lock lock(_targets_rwmutex);
- if (auto result = consider_update_target(slobrok_id, connection_spec, curr_slobrok_gen, lock)) {
+ if (auto result = consider_update_target(address, connection_spec, curr_slobrok_gen, lock)) {
return result;
}
- return insert_new_target_mapping(slobrok_id, connection_spec, curr_slobrok_gen, lock);
+ return insert_new_target_mapping(address, connection_spec, curr_slobrok_gen, lock);
}
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
index cf94f7545bc..52b505d5476 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
@@ -3,38 +3,44 @@
#include "rpc_target.h"
#include "rpc_target_factory.h"
+#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <memory>
#include <shared_mutex>
namespace slobrok::api { class IMirrorAPI; }
-namespace storage {
-
-namespace api { class StorageMessageAddress; }
-
-namespace rpc {
+namespace storage::rpc {
/**
* Class that resolves and caches rpc targets based on StorageMessageAddress that is mapped to slobrok id,
* with lookup in a slobrok mirror.
*/
class CachingRpcTargetResolver {
-private:
- const slobrok::api::IMirrorAPI& _slobrok_mirror;
- const RpcTargetFactory& _target_factory;
+
+ struct AddressInternalHasher {
+ size_t operator()(const api::StorageMessageAddress& addr) const noexcept {
+ return addr.internal_storage_hash();
+ }
+ };
+ using TargetHashMap = vespalib::hash_map<api::StorageMessageAddress,
+ std::shared_ptr<RpcTarget>,
+ AddressInternalHasher>;
using UniqueLock = std::unique_lock<std::shared_mutex>;
- mutable std::shared_mutex _targets_rwmutex;
- // TODO LRU? Size cap?
- vespalib::hash_map<vespalib::string, std::shared_ptr<RpcTarget>> _targets;
- std::shared_ptr<RpcTarget> lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen);
- std::shared_ptr<RpcTarget> consider_update_target(const vespalib::string& slobrok_id,
+ const slobrok::api::IMirrorAPI& _slobrok_mirror;
+ const RpcTargetFactory& _target_factory;
+ mutable std::shared_mutex _targets_rwmutex;
+ TargetHashMap _targets; // TODO LRU? Size cap?
+
+ std::shared_ptr<RpcTarget> lookup_target(const api::StorageMessageAddress& address,
+ uint32_t curr_slobrok_gen);
+ std::shared_ptr<RpcTarget> consider_update_target(const api::StorageMessageAddress& address,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
const UniqueLock& targets_lock);
- std::shared_ptr<RpcTarget> insert_new_target_mapping(const vespalib::string& slobrok_id,
+ std::shared_ptr<RpcTarget> insert_new_target_mapping(const api::StorageMessageAddress& address,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
const UniqueLock& targets_lock);
@@ -49,5 +55,4 @@ public:
std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address);
};
-} // rpc
-} // storage
+} // storage::rpc
diff --git a/storageapi/src/tests/messageapi/CMakeLists.txt b/storageapi/src/tests/messageapi/CMakeLists.txt
index 4833dc45acf..50f0b306191 100644
--- a/storageapi/src/tests/messageapi/CMakeLists.txt
+++ b/storageapi/src/tests/messageapi/CMakeLists.txt
@@ -1,5 +1,8 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(storageapi_testmessageapi INTERFACE
+vespa_add_library(storageapi_testmessageapi
SOURCES
+ storage_message_address_test.cpp
DEPENDS
+ storageapi
+ GTest::GTest
)
diff --git a/storageapi/src/tests/messageapi/storage_message_address_test.cpp b/storageapi/src/tests/messageapi/storage_message_address_test.cpp
new file mode 100644
index 00000000000..c340cba4b28
--- /dev/null
+++ b/storageapi/src/tests/messageapi/storage_message_address_test.cpp
@@ -0,0 +1,36 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::api {
+
+namespace {
+
+size_t hash_of(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index) {
+ return StorageMessageAddress(cluster, type, index).internal_storage_hash();
+}
+
+}
+
+TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) {
+ EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 0),
+ hash_of("foo", lib::NodeType::STORAGE, 0));
+ EXPECT_EQ(hash_of("foo", lib::NodeType::DISTRIBUTOR, 0),
+ hash_of("foo", lib::NodeType::DISTRIBUTOR, 0));
+ EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 123),
+ hash_of("foo", lib::NodeType::STORAGE, 123));
+
+ // These tests are all true with extremely high probability, though they do
+ // depend on a hash function that may inherently cause collisions.
+ EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0),
+ hash_of("bar", lib::NodeType::STORAGE, 0));
+ EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0),
+ hash_of("foo", lib::NodeType::DISTRIBUTOR, 0));
+ EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0),
+ hash_of("foo", lib::NodeType::STORAGE, 1));
+}
+
+} // storage::api
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
index d1bd24f5087..8276587834a 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp
@@ -6,6 +6,7 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/stllike/hash_fun.h>
#include <sstream>
#include <cassert>
#include <atomic>
@@ -141,8 +142,8 @@ MessageType::print(std::ostream& out, bool verbose, const std::string& indent) c
StorageMessageAddress::StorageMessageAddress(const mbus::Route& route)
: _route(route),
- _retryEnabled(false),
_protocol(DOCUMENT),
+ _precomputed_storage_hash(0),
_cluster(""),
_type(nullptr),
_index(0xFFFF)
@@ -160,17 +161,34 @@ createAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t i
return os.str();
}
+// TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available
+StorageMessageAddress::StorageMessageAddress()
+ : _route(),
+ _protocol(Protocol::STORAGE),
+ _precomputed_storage_hash(0),
+ _cluster(),
+ _type(nullptr),
+ _index(0)
+{}
+
+
StorageMessageAddress::StorageMessageAddress(vespalib::stringref cluster, const lib::NodeType& type,
uint16_t index, Protocol protocol)
: _route(),
- _retryEnabled(false),
_protocol(protocol),
+ _precomputed_storage_hash(0),
_cluster(cluster),
_type(&type),
_index(index)
{
std::vector<mbus::IHopDirective::SP> directives;
- directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(createAddress(cluster, type, index)));
+ auto address_as_str = createAddress(cluster, type, index);
+ // We reuse the string representation and pass it to vespalib's hashValue instead of
+ // explicitly combining a running hash over the individual fields. This is because
+ // hashValue internally uses xxhash, which offers great dispersion of bits even for
+ // minimal changes in the input (such as single bit differences in the index).
+ _precomputed_storage_hash = vespalib::hashValue(address_as_str.data(), address_as_str.size());
+ directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str)));
_route.addHop(mbus::Hop(std::move(directives), false));
}
@@ -207,7 +225,6 @@ bool
StorageMessageAddress::operator==(const StorageMessageAddress& other) const
{
if (_protocol != other._protocol) return false;
- if (_retryEnabled != other._retryEnabled) return false;
if (_type != other._type) return false;
if (_type) {
if (_cluster != other._cluster) return false;
@@ -234,9 +251,6 @@ StorageMessageAddress::print(vespalib::asciistream & out) const
} else {
out << "Document protocol";
}
- if (_retryEnabled) {
- out << ", retry enabled";
- }
if (!_type) {
out << ", " << _route.toString() << ")";
} else {
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
index 415bd7717f2..85d4e072171 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h
@@ -269,14 +269,15 @@ public:
private:
mbus::Route _route;
- bool _retryEnabled;
Protocol _protocol;
// Used for internal VDS addresses only
+ size_t _precomputed_storage_hash;
vespalib::string _cluster;
const lib::NodeType* _type;
uint16_t _index;
public:
+ StorageMessageAddress(); // Only to be used when transient default ctor semantics are needed by containers
StorageMessageAddress(const mbus::Route& route);
StorageMessageAddress(vespalib::stringref clusterName,
const lib::NodeType& type, uint16_t index,
@@ -284,15 +285,18 @@ public:
~StorageMessageAddress();
void setProtocol(Protocol p) { _protocol = p; }
- void enableRetry(bool enable = true) { _retryEnabled = enable; }
const mbus::Route& getRoute() const { return _route; }
- bool retryEnabled() const { return _retryEnabled; }
Protocol getProtocol() const { return _protocol; }
uint16_t getIndex() const;
const lib::NodeType& getNodeType() const;
const vespalib::string& getCluster() const;
+ // Returns precomputed hash over <cluster, type, index> tuple. Other fields not included.
+ [[nodiscard]] size_t internal_storage_hash() const noexcept {
+ return _precomputed_storage_hash;
+ }
+
bool operator==(const StorageMessageAddress& other) const;
vespalib::string toString() const;
friend std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr);