diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-22 12:24:12 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-22 12:24:12 +0000 |
commit | 5afff095ad3ba0b0811b5788ebaeb3c84832d7f6 (patch) | |
tree | 175a944dd98e9f1ffd961642c12cb75b44ce2610 | |
parent | 1fa18524f943ae986f6526db01945d41de2f9d6d (diff) |
Avoid address stringification in common lookup path
Precompute internal address hash over <cluster, type, index> tuple.
No other fields are included in the hash, as this is only used for
storage API lookups.
Remove automatic resending field from address, as we never use MBus
resending functionality in the backend communication protocols.
7 files changed, 106 insertions, 42 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 5471d66a864..99fdb97e435 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -581,7 +581,7 @@ CommunicationManager::sendCommand( auto cmd = std::make_unique<mbusprot::StorageCommand>(msg); cmd->setContext(mbus::Context(msg->getMsgId())); - cmd->setRetryEnabled(address.retryEnabled()); + cmd->setRetryEnabled(false); cmd->setTimeRemaining(msg->getTimeout()); cmd->setTrace(msg->getTrace()); sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); @@ -597,7 +597,7 @@ CommunicationManager::sendCommand( if (mbusMsg) { MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converted OK"); mbusMsg->setTrace(msg->getTrace()); - mbusMsg->setRetryEnabled(address.retryEnabled()); + mbusMsg->setRetryEnabled(false); { vespalib::LockGuard lock(_messageBusSentLock); diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp index 5241ec6f769..6bcb154aed5 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp @@ -33,9 +33,9 @@ CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen) { +CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint32_t curr_slobrok_gen) { std::shared_lock lock(_targets_rwmutex); - auto itr = _targets.find(slobrok_id); + auto itr = _targets.find(address); if ((itr != _targets.end()) && itr->second->_target->is_valid() && (itr->second->_slobrok_gen == curr_slobrok_gen)) { @@ -45,18 +45,19 @@ CachingRpcTargetResolver::lookup_target(const vespalib::string& slobrok_id, uint } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::consider_update_target(const vespalib::string& slobrok_id, +CachingRpcTargetResolver::consider_update_target(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, [[maybe_unused]] const UniqueLock& targets_lock) { // If address has the same spec as the existing target, just reuse it. - auto itr = _targets.find(slobrok_id); + auto itr = _targets.find(address); if ((itr != _targets.end()) && (itr->second->_target->is_valid()) && (itr->second->_spec == connection_spec)) { LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u", - slobrok_id.c_str(), connection_spec.c_str(), itr->second->_slobrok_gen, curr_slobrok_gen); + address.toString().c_str(), connection_spec.c_str(), + itr->second->_slobrok_gen, curr_slobrok_gen); itr->second->_slobrok_gen = curr_slobrok_gen; return itr->second; } @@ -64,26 +65,27 @@ CachingRpcTargetResolver::consider_update_target(const vespalib::string& slobrok } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::insert_new_target_mapping(const vespalib::string& slobrok_id, +CachingRpcTargetResolver::insert_new_target_mapping(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, [[maybe_unused]] const UniqueLock& targets_lock) { auto target = _target_factory.make_target(connection_spec, curr_slobrok_gen); // TODO expensive inside lock? assert(target); std::shared_ptr<RpcTarget> rpc_target(std::move(target)); - _targets[slobrok_id] = rpc_target; - LOG(debug, "Added mapping '%s' -> '%s' at gen %u", slobrok_id.c_str(), connection_spec.c_str(), curr_slobrok_gen); + // TODO emplacement (with replace) semantics to avoid need for default constructed K/V + _targets[address] = rpc_target; + LOG(debug, "Added mapping '%s' -> '%s' at gen %u", address.toString().c_str(), + connection_spec.c_str(), curr_slobrok_gen); return rpc_target; } std::shared_ptr<RpcTarget> CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) { - // TODO or map directly from address to target instead of going via stringification? Needs hashing, if so. - auto slobrok_id = address_to_slobrok_id(address); const uint32_t curr_slobrok_gen = _slobrok_mirror.updates(); - if (auto result = lookup_target(slobrok_id, curr_slobrok_gen)) { + if (auto result = lookup_target(address, curr_slobrok_gen)) { return result; } + auto slobrok_id = address_to_slobrok_id(address); auto specs = _slobrok_mirror.lookup(slobrok_id); // FIXME string type mismatch; implicit conv! if (specs.empty()) { LOG(debug, "Found no mapping for '%s'", slobrok_id.c_str()); @@ -95,10 +97,10 @@ CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& a assert(specs.size() == 1); const auto& connection_spec = specs[0].second; std::unique_lock lock(_targets_rwmutex); - if (auto result = consider_update_target(slobrok_id, connection_spec, curr_slobrok_gen, lock)) { + if (auto result = consider_update_target(address, connection_spec, curr_slobrok_gen, lock)) { return result; } - return insert_new_target_mapping(slobrok_id, connection_spec, curr_slobrok_gen, lock); + return insert_new_target_mapping(address, connection_spec, curr_slobrok_gen, lock); } } diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h index cf94f7545bc..52b505d5476 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h @@ -3,38 +3,44 @@ #include "rpc_target.h" #include "rpc_target_factory.h" +#include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/vespalib/stllike/hash_map.h> #include <memory> #include <shared_mutex> namespace slobrok::api { class IMirrorAPI; } -namespace storage { - -namespace api { class StorageMessageAddress; } - -namespace rpc { +namespace storage::rpc { /** * Class that resolves and caches rpc targets based on StorageMessageAddress that is mapped to slobrok id, * with lookup in a slobrok mirror. */ class CachingRpcTargetResolver { -private: - const slobrok::api::IMirrorAPI& _slobrok_mirror; - const RpcTargetFactory& _target_factory; + + struct AddressInternalHasher { + size_t operator()(const api::StorageMessageAddress& addr) const noexcept { + return addr.internal_storage_hash(); + } + }; + using TargetHashMap = vespalib::hash_map<api::StorageMessageAddress, + std::shared_ptr<RpcTarget>, + AddressInternalHasher>; using UniqueLock = std::unique_lock<std::shared_mutex>; - mutable std::shared_mutex _targets_rwmutex; - // TODO LRU? Size cap? - vespalib::hash_map<vespalib::string, std::shared_ptr<RpcTarget>> _targets; - std::shared_ptr<RpcTarget> lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen); - std::shared_ptr<RpcTarget> consider_update_target(const vespalib::string& slobrok_id, + const slobrok::api::IMirrorAPI& _slobrok_mirror; + const RpcTargetFactory& _target_factory; + mutable std::shared_mutex _targets_rwmutex; + TargetHashMap _targets; // TODO LRU? Size cap? + + std::shared_ptr<RpcTarget> lookup_target(const api::StorageMessageAddress& address, + uint32_t curr_slobrok_gen); + std::shared_ptr<RpcTarget> consider_update_target(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, const UniqueLock& targets_lock); - std::shared_ptr<RpcTarget> insert_new_target_mapping(const vespalib::string& slobrok_id, + std::shared_ptr<RpcTarget> insert_new_target_mapping(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, const UniqueLock& targets_lock); @@ -49,5 +55,4 @@ public: std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address); }; -} // rpc -} // storage +} // storage::rpc diff --git a/storageapi/src/tests/messageapi/CMakeLists.txt b/storageapi/src/tests/messageapi/CMakeLists.txt index 4833dc45acf..50f0b306191 100644 --- a/storageapi/src/tests/messageapi/CMakeLists.txt +++ b/storageapi/src/tests/messageapi/CMakeLists.txt @@ -1,5 +1,8 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(storageapi_testmessageapi INTERFACE +vespa_add_library(storageapi_testmessageapi SOURCES + storage_message_address_test.cpp DEPENDS + storageapi + GTest::GTest ) diff --git a/storageapi/src/tests/messageapi/storage_message_address_test.cpp b/storageapi/src/tests/messageapi/storage_message_address_test.cpp new file mode 100644 index 00000000000..c340cba4b28 --- /dev/null +++ b/storageapi/src/tests/messageapi/storage_message_address_test.cpp @@ -0,0 +1,36 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace ::testing; + +namespace storage::api { + +namespace { + +size_t hash_of(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index) { + return StorageMessageAddress(cluster, type, index).internal_storage_hash(); +} + +} + +TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) { + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::STORAGE, 0)); + EXPECT_EQ(hash_of("foo", lib::NodeType::DISTRIBUTOR, 0), + hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 123), + hash_of("foo", lib::NodeType::STORAGE, 123)); + + // These tests are all true with extremely high probability, though they do + // depend on a hash function that may inherently cause collisions. + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("bar", lib::NodeType::STORAGE, 0)); + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::STORAGE, 1)); +} + +} // storage::api diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index d1bd24f5087..8276587834a 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -6,6 +6,7 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/stllike/hash_fun.h> #include <sstream> #include <cassert> #include <atomic> @@ -141,8 +142,8 @@ MessageType::print(std::ostream& out, bool verbose, const std::string& indent) c StorageMessageAddress::StorageMessageAddress(const mbus::Route& route) : _route(route), - _retryEnabled(false), _protocol(DOCUMENT), + _precomputed_storage_hash(0), _cluster(""), _type(nullptr), _index(0xFFFF) @@ -160,17 +161,34 @@ createAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t i return os.str(); } +// TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available +StorageMessageAddress::StorageMessageAddress() + : _route(), + _protocol(Protocol::STORAGE), + _precomputed_storage_hash(0), + _cluster(), + _type(nullptr), + _index(0) +{} + + StorageMessageAddress::StorageMessageAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index, Protocol protocol) : _route(), - _retryEnabled(false), _protocol(protocol), + _precomputed_storage_hash(0), _cluster(cluster), _type(&type), _index(index) { std::vector<mbus::IHopDirective::SP> directives; - directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(createAddress(cluster, type, index))); + auto address_as_str = createAddress(cluster, type, index); + // We reuse the string representation and pass it to vespalib's hashValue instead of + // explicitly combining a running hash over the individual fields. This is because + // hashValue internally uses xxhash, which offers great dispersion of bits even for + // minimal changes in the input (such as single bit differences in the index). + _precomputed_storage_hash = vespalib::hashValue(address_as_str.data(), address_as_str.size()); + directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str))); _route.addHop(mbus::Hop(std::move(directives), false)); } @@ -207,7 +225,6 @@ bool StorageMessageAddress::operator==(const StorageMessageAddress& other) const { if (_protocol != other._protocol) return false; - if (_retryEnabled != other._retryEnabled) return false; if (_type != other._type) return false; if (_type) { if (_cluster != other._cluster) return false; @@ -234,9 +251,6 @@ StorageMessageAddress::print(vespalib::asciistream & out) const } else { out << "Document protocol"; } - if (_retryEnabled) { - out << ", retry enabled"; - } if (!_type) { out << ", " << _route.toString() << ")"; } else { diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index 415bd7717f2..85d4e072171 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -269,14 +269,15 @@ public: private: mbus::Route _route; - bool _retryEnabled; Protocol _protocol; // Used for internal VDS addresses only + size_t _precomputed_storage_hash; vespalib::string _cluster; const lib::NodeType* _type; uint16_t _index; public: + StorageMessageAddress(); // Only to be used when transient default ctor semantics are needed by containers StorageMessageAddress(const mbus::Route& route); StorageMessageAddress(vespalib::stringref clusterName, const lib::NodeType& type, uint16_t index, @@ -284,15 +285,18 @@ public: ~StorageMessageAddress(); void setProtocol(Protocol p) { _protocol = p; } - void enableRetry(bool enable = true) { _retryEnabled = enable; } const mbus::Route& getRoute() const { return _route; } - bool retryEnabled() const { return _retryEnabled; } Protocol getProtocol() const { return _protocol; } uint16_t getIndex() const; const lib::NodeType& getNodeType() const; const vespalib::string& getCluster() const; + // Returns precomputed hash over <cluster, type, index> tuple. Other fields not included. + [[nodiscard]] size_t internal_storage_hash() const noexcept { + return _precomputed_storage_hash; + } + bool operator==(const StorageMessageAddress& other) const; vespalib::string toString() const; friend std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr); |