diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-22 18:37:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-22 18:37:44 +0200 |
commit | 1391037a3feb3293e8821ffa280ef8bc765e2a0b (patch) | |
tree | fbe8aea4c4761519dc76d4b8991d4d2026dc0605 | |
parent | a9f4f35ef4777ca7f18fef41c3cd976b5d15360c (diff) | |
parent | 5afff095ad3ba0b0811b5788ebaeb3c84832d7f6 (diff) |
Merge pull request #14489 from vespa-engine/vekterli/no-address-stringification-in-common-lookup-path
Avoid address stringification in common lookup path
7 files changed, 106 insertions, 42 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 5471d66a864..99fdb97e435 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -581,7 +581,7 @@ CommunicationManager::sendCommand( auto cmd = std::make_unique<mbusprot::StorageCommand>(msg); cmd->setContext(mbus::Context(msg->getMsgId())); - cmd->setRetryEnabled(address.retryEnabled()); + cmd->setRetryEnabled(false); cmd->setTimeRemaining(msg->getTimeout()); cmd->setTrace(msg->getTrace()); sendMessageBusMessage(msg, std::move(cmd), address.getRoute()); @@ -597,7 +597,7 @@ CommunicationManager::sendCommand( if (mbusMsg) { MBUS_TRACE(msg->getTrace(), 7, "Communication manager: Converted OK"); mbusMsg->setTrace(msg->getTrace()); - mbusMsg->setRetryEnabled(address.retryEnabled()); + mbusMsg->setRetryEnabled(false); { vespalib::LockGuard lock(_messageBusSentLock); diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp index 5241ec6f769..6bcb154aed5 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp @@ -33,9 +33,9 @@ CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen) { +CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint32_t curr_slobrok_gen) { std::shared_lock lock(_targets_rwmutex); - auto itr = _targets.find(slobrok_id); + auto itr = _targets.find(address); if ((itr != _targets.end()) && itr->second->_target->is_valid() && (itr->second->_slobrok_gen == curr_slobrok_gen)) { @@ -45,18 +45,19 @@ CachingRpcTargetResolver::lookup_target(const vespalib::string& slobrok_id, uint } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::consider_update_target(const vespalib::string& slobrok_id, +CachingRpcTargetResolver::consider_update_target(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, [[maybe_unused]] const UniqueLock& targets_lock) { // If address has the same spec as the existing target, just reuse it. - auto itr = _targets.find(slobrok_id); + auto itr = _targets.find(address); if ((itr != _targets.end()) && (itr->second->_target->is_valid()) && (itr->second->_spec == connection_spec)) { LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u", - slobrok_id.c_str(), connection_spec.c_str(), itr->second->_slobrok_gen, curr_slobrok_gen); + address.toString().c_str(), connection_spec.c_str(), + itr->second->_slobrok_gen, curr_slobrok_gen); itr->second->_slobrok_gen = curr_slobrok_gen; return itr->second; } @@ -64,26 +65,27 @@ CachingRpcTargetResolver::consider_update_target(const vespalib::string& slobrok } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::insert_new_target_mapping(const vespalib::string& slobrok_id, +CachingRpcTargetResolver::insert_new_target_mapping(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, [[maybe_unused]] const UniqueLock& targets_lock) { auto target = _target_factory.make_target(connection_spec, curr_slobrok_gen); // TODO expensive inside lock? assert(target); std::shared_ptr<RpcTarget> rpc_target(std::move(target)); - _targets[slobrok_id] = rpc_target; - LOG(debug, "Added mapping '%s' -> '%s' at gen %u", slobrok_id.c_str(), connection_spec.c_str(), curr_slobrok_gen); + // TODO emplacement (with replace) semantics to avoid need for default constructed K/V + _targets[address] = rpc_target; + LOG(debug, "Added mapping '%s' -> '%s' at gen %u", address.toString().c_str(), + connection_spec.c_str(), curr_slobrok_gen); return rpc_target; } std::shared_ptr<RpcTarget> CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) { - // TODO or map directly from address to target instead of going via stringification? Needs hashing, if so. - auto slobrok_id = address_to_slobrok_id(address); const uint32_t curr_slobrok_gen = _slobrok_mirror.updates(); - if (auto result = lookup_target(slobrok_id, curr_slobrok_gen)) { + if (auto result = lookup_target(address, curr_slobrok_gen)) { return result; } + auto slobrok_id = address_to_slobrok_id(address); auto specs = _slobrok_mirror.lookup(slobrok_id); // FIXME string type mismatch; implicit conv! if (specs.empty()) { LOG(debug, "Found no mapping for '%s'", slobrok_id.c_str()); @@ -95,10 +97,10 @@ CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& a assert(specs.size() == 1); const auto& connection_spec = specs[0].second; std::unique_lock lock(_targets_rwmutex); - if (auto result = consider_update_target(slobrok_id, connection_spec, curr_slobrok_gen, lock)) { + if (auto result = consider_update_target(address, connection_spec, curr_slobrok_gen, lock)) { return result; } - return insert_new_target_mapping(slobrok_id, connection_spec, curr_slobrok_gen, lock); + return insert_new_target_mapping(address, connection_spec, curr_slobrok_gen, lock); } } diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h index cf94f7545bc..52b505d5476 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h @@ -3,38 +3,44 @@ #include "rpc_target.h" #include "rpc_target_factory.h" +#include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/vespalib/stllike/hash_map.h> #include <memory> #include <shared_mutex> namespace slobrok::api { class IMirrorAPI; } -namespace storage { - -namespace api { class StorageMessageAddress; } - -namespace rpc { +namespace storage::rpc { /** * Class that resolves and caches rpc targets based on StorageMessageAddress that is mapped to slobrok id, * with lookup in a slobrok mirror. */ class CachingRpcTargetResolver { -private: - const slobrok::api::IMirrorAPI& _slobrok_mirror; - const RpcTargetFactory& _target_factory; + + struct AddressInternalHasher { + size_t operator()(const api::StorageMessageAddress& addr) const noexcept { + return addr.internal_storage_hash(); + } + }; + using TargetHashMap = vespalib::hash_map<api::StorageMessageAddress, + std::shared_ptr<RpcTarget>, + AddressInternalHasher>; using UniqueLock = std::unique_lock<std::shared_mutex>; - mutable std::shared_mutex _targets_rwmutex; - // TODO LRU? Size cap? - vespalib::hash_map<vespalib::string, std::shared_ptr<RpcTarget>> _targets; - std::shared_ptr<RpcTarget> lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen); - std::shared_ptr<RpcTarget> consider_update_target(const vespalib::string& slobrok_id, + const slobrok::api::IMirrorAPI& _slobrok_mirror; + const RpcTargetFactory& _target_factory; + mutable std::shared_mutex _targets_rwmutex; + TargetHashMap _targets; // TODO LRU? Size cap? + + std::shared_ptr<RpcTarget> lookup_target(const api::StorageMessageAddress& address, + uint32_t curr_slobrok_gen); + std::shared_ptr<RpcTarget> consider_update_target(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, const UniqueLock& targets_lock); - std::shared_ptr<RpcTarget> insert_new_target_mapping(const vespalib::string& slobrok_id, + std::shared_ptr<RpcTarget> insert_new_target_mapping(const api::StorageMessageAddress& address, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, const UniqueLock& targets_lock); @@ -49,5 +55,4 @@ public: std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address); }; -} // rpc -} // storage +} // storage::rpc diff --git a/storageapi/src/tests/messageapi/CMakeLists.txt b/storageapi/src/tests/messageapi/CMakeLists.txt index 4833dc45acf..50f0b306191 100644 --- a/storageapi/src/tests/messageapi/CMakeLists.txt +++ b/storageapi/src/tests/messageapi/CMakeLists.txt @@ -1,5 +1,8 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(storageapi_testmessageapi INTERFACE +vespa_add_library(storageapi_testmessageapi SOURCES + storage_message_address_test.cpp DEPENDS + storageapi + GTest::GTest ) diff --git a/storageapi/src/tests/messageapi/storage_message_address_test.cpp b/storageapi/src/tests/messageapi/storage_message_address_test.cpp new file mode 100644 index 00000000000..c340cba4b28 --- /dev/null +++ b/storageapi/src/tests/messageapi/storage_message_address_test.cpp @@ -0,0 +1,36 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace ::testing; + +namespace storage::api { + +namespace { + +size_t hash_of(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index) { + return StorageMessageAddress(cluster, type, index).internal_storage_hash(); +} + +} + +TEST(StorageMessageAddressTest, storage_hash_covers_all_expected_fields) { + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::STORAGE, 0)); + EXPECT_EQ(hash_of("foo", lib::NodeType::DISTRIBUTOR, 0), + hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); + EXPECT_EQ(hash_of("foo", lib::NodeType::STORAGE, 123), + hash_of("foo", lib::NodeType::STORAGE, 123)); + + // These tests are all true with extremely high probability, though they do + // depend on a hash function that may inherently cause collisions. + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("bar", lib::NodeType::STORAGE, 0)); + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::DISTRIBUTOR, 0)); + EXPECT_NE(hash_of("foo", lib::NodeType::STORAGE, 0), + hash_of("foo", lib::NodeType::STORAGE, 1)); +} + +} // storage::api diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp index d1bd24f5087..8276587834a 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.cpp @@ -6,6 +6,7 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/stllike/hash_fun.h> #include <sstream> #include <cassert> #include <atomic> @@ -141,8 +142,8 @@ MessageType::print(std::ostream& out, bool verbose, const std::string& indent) c StorageMessageAddress::StorageMessageAddress(const mbus::Route& route) : _route(route), - _retryEnabled(false), _protocol(DOCUMENT), + _precomputed_storage_hash(0), _cluster(""), _type(nullptr), _index(0xFFFF) @@ -160,17 +161,34 @@ createAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t i return os.str(); } +// TODO we ideally want this removed. Currently just in place to support usage as map key when emplacement not available +StorageMessageAddress::StorageMessageAddress() + : _route(), + _protocol(Protocol::STORAGE), + _precomputed_storage_hash(0), + _cluster(), + _type(nullptr), + _index(0) +{} + + StorageMessageAddress::StorageMessageAddress(vespalib::stringref cluster, const lib::NodeType& type, uint16_t index, Protocol protocol) : _route(), - _retryEnabled(false), _protocol(protocol), + _precomputed_storage_hash(0), _cluster(cluster), _type(&type), _index(index) { std::vector<mbus::IHopDirective::SP> directives; - directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(createAddress(cluster, type, index))); + auto address_as_str = createAddress(cluster, type, index); + // We reuse the string representation and pass it to vespalib's hashValue instead of + // explicitly combining a running hash over the individual fields. This is because + // hashValue internally uses xxhash, which offers great dispersion of bits even for + // minimal changes in the input (such as single bit differences in the index). + _precomputed_storage_hash = vespalib::hashValue(address_as_str.data(), address_as_str.size()); + directives.emplace_back(std::make_shared<mbus::VerbatimDirective>(std::move(address_as_str))); _route.addHop(mbus::Hop(std::move(directives), false)); } @@ -207,7 +225,6 @@ bool StorageMessageAddress::operator==(const StorageMessageAddress& other) const { if (_protocol != other._protocol) return false; - if (_retryEnabled != other._retryEnabled) return false; if (_type != other._type) return false; if (_type) { if (_cluster != other._cluster) return false; @@ -234,9 +251,6 @@ StorageMessageAddress::print(vespalib::asciistream & out) const } else { out << "Document protocol"; } - if (_retryEnabled) { - out << ", retry enabled"; - } if (!_type) { out << ", " << _route.toString() << ")"; } else { diff --git a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h index 415bd7717f2..85d4e072171 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagemessage.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagemessage.h @@ -269,14 +269,15 @@ public: private: mbus::Route _route; - bool _retryEnabled; Protocol _protocol; // Used for internal VDS addresses only + size_t _precomputed_storage_hash; vespalib::string _cluster; const lib::NodeType* _type; uint16_t _index; public: + StorageMessageAddress(); // Only to be used when transient default ctor semantics are needed by containers StorageMessageAddress(const mbus::Route& route); StorageMessageAddress(vespalib::stringref clusterName, const lib::NodeType& type, uint16_t index, @@ -284,15 +285,18 @@ public: ~StorageMessageAddress(); void setProtocol(Protocol p) { _protocol = p; } - void enableRetry(bool enable = true) { _retryEnabled = enable; } const mbus::Route& getRoute() const { return _route; } - bool retryEnabled() const { return _retryEnabled; } Protocol getProtocol() const { return _protocol; } uint16_t getIndex() const; const lib::NodeType& getNodeType() const; const vespalib::string& getCluster() const; + // Returns precomputed hash over <cluster, type, index> tuple. Other fields not included. + [[nodiscard]] size_t internal_storage_hash() const noexcept { + return _precomputed_storage_hash; + } + bool operator==(const StorageMessageAddress& other) const; vespalib::string toString() const; friend std::ostream & operator << (std::ostream & os, const StorageMessageAddress & addr); |