diff options
Diffstat (limited to 'storage/src')
14 files changed, 205 insertions, 110 deletions
diff --git a/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp index 9a2eb4d5c64..6291a8ad0dd 100644 --- a/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp +++ b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp @@ -29,13 +29,14 @@ public: void inc_gen() { ++gen; } }; -class MockWrappedFrtTarget : public WrappedFrtTarget { +class MockRpcTarget : public RpcTarget { private: bool& _valid; public: - MockWrappedFrtTarget(bool& valid) : _valid(valid) {} + MockRpcTarget(bool& valid) : _valid(valid) {} FRT_Target* get() noexcept override { return nullptr; } bool is_valid() const noexcept override { return _valid; } + const vespalib::string& spec() const noexcept override { abort(); } }; class MockTargetFactory : public RpcTargetFactory { @@ -43,9 +44,8 @@ public: mutable bool valid_target; MockTargetFactory() : valid_target(true) {} - std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override { - return std::make_unique<RpcTarget>(std::make_unique<MockWrappedFrtTarget>(valid_target), - connection_spec, slobrok_gen); + std::unique_ptr<RpcTarget> make_target([[maybe_unused]] const vespalib::string& connection_spec) const override { + return std::make_unique<MockRpcTarget>(valid_target); } }; @@ -58,15 +58,21 @@ public: StorageMessageAddress address_1; vespalib::string spec_0; vespalib::string spec_1; + uint64_t bucket_id_0; + uint64_t bucket_id_1; + uint64_t bucket_id_2; CachingRpcTargetResolverTest() : mirror(), factory(), - resolver(mirror, factory), + resolver(mirror, factory, 2), address_0("my_cluster", NodeType::STORAGE, 5), address_1("my_cluster", NodeType::DISTRIBUTOR, 7), spec_0("tcp/my:41"), - spec_1("tcp/my:42") + spec_1("tcp/my:42"), + bucket_id_0(3), + bucket_id_1(4), + bucket_id_2(5) { add_mapping(address_0, spec_0); } @@ -76,6 +82,9 @@ public: static vespalib::string to_slobrok_id(const storage::api::StorageMessageAddress& address) { return CachingRpcTargetResolver::address_to_slobrok_id(address); } + std::shared_ptr<RpcTarget> resolve_rpc_target(const StorageMessageAddress& address) { + return resolver.resolve_rpc_target(address, bucket_id_0); + } }; TEST_F(CachingRpcTargetResolverTest, converts_storage_message_address_to_slobrok_id) @@ -86,49 +95,58 @@ TEST_F(CachingRpcTargetResolverTest, converts_storage_message_address_to_slobrok TEST_F(CachingRpcTargetResolverTest, resolves_rpc_target_and_caches_result) { - auto target_a = resolver.resolve_rpc_target(address_0); + auto target_a = resolve_rpc_target(address_0); ASSERT_TRUE(target_a); - EXPECT_EQ(spec_0, target_a->_spec); - EXPECT_EQ(1, target_a->_slobrok_gen); - auto target_b = resolver.resolve_rpc_target(address_0); + auto target_b = resolve_rpc_target(address_0); ASSERT_TRUE(target_b); EXPECT_EQ(target_a.get(), target_b.get()); - EXPECT_EQ(spec_0, target_b->_spec); - EXPECT_EQ(1, target_b->_slobrok_gen); } -TEST_F(CachingRpcTargetResolverTest, cached_rpc_target_is_updated_when_slobrok_generation_changes) +TEST_F(CachingRpcTargetResolverTest, rpc_target_pool_is_updated_when_slobrok_generation_changes) { - auto target_a = resolver.resolve_rpc_target(address_0); + auto target_a = resolve_rpc_target(address_0); mirror.inc_gen(); - auto target_b = resolver.resolve_rpc_target(address_0); + auto target_b = resolve_rpc_target(address_0); EXPECT_EQ(target_a.get(), target_b.get()); - EXPECT_EQ(2, target_b->_slobrok_gen); + auto pool = resolver.resolve_rpc_target_pool(address_0); + EXPECT_EQ(2, pool->slobrok_gen()); } TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_connection_spec_changes) { - auto target_a = resolver.resolve_rpc_target(address_0); + auto target_a = resolve_rpc_target(address_0); add_mapping(address_0, spec_1); mirror.inc_gen(); - auto target_b = resolver.resolve_rpc_target(address_0); + auto target_b = resolve_rpc_target(address_0); EXPECT_NE(target_a.get(), target_b.get()); - EXPECT_EQ(spec_1, target_b->_spec); - EXPECT_EQ(2, target_b->_slobrok_gen); + auto pool = resolver.resolve_rpc_target_pool(address_0); + EXPECT_EQ(spec_1, pool->spec()); + EXPECT_EQ(2, pool->slobrok_gen()); } TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_raw_target_is_invalid) { - auto target_a = resolver.resolve_rpc_target(address_0); + auto target_a = resolve_rpc_target(address_0); factory.valid_target = false; - auto target_b = resolver.resolve_rpc_target(address_0); + auto target_b = resolve_rpc_target(address_0); EXPECT_NE(target_a.get(), target_b.get()); - EXPECT_EQ(spec_0, target_b->_spec); - EXPECT_EQ(1, target_b->_slobrok_gen); } TEST_F(CachingRpcTargetResolverTest, null_rpc_target_is_returned_if_slobrok_id_is_not_found) { - auto target = resolver.resolve_rpc_target(address_1); + auto target = resolve_rpc_target(address_1); EXPECT_FALSE(target); } + +TEST_F(CachingRpcTargetResolverTest, bucket_id_is_used_to_select_target) +{ + auto target_a = resolver.resolve_rpc_target(address_0, bucket_id_0); + auto target_b = resolver.resolve_rpc_target(address_0, bucket_id_0); + auto target_c = resolver.resolve_rpc_target(address_0, bucket_id_2); + auto target_d = resolver.resolve_rpc_target(address_0, bucket_id_1); + auto target_e = resolver.resolve_rpc_target(address_0, bucket_id_1); + EXPECT_EQ(target_a.get(), target_b.get()); + EXPECT_EQ(target_a.get(), target_c.get()); + EXPECT_EQ(target_d.get(), target_e.get()); + EXPECT_NE(target_a.get(), target_d.get()); +} diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 3babc98cbb1..cabb8516153 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -63,6 +63,13 @@ use_direct_storageapi_rpc bool default=false ## The number of network (FNET) threads used by the shared rpc resource. rpc.num_network_threads int default=1 +## The number of (FNET) RPC targets to use per node in the cluster. +## +## The bucket id associated with a message is used to select the RPC target. +## This ensures the same RPC target is used for all messages to the same bucket to the same node, +## and the RPC target itself handles sequencing of these messages. +rpc.num_targets_per_node int default=1 + # Minimum size of packets to compress (0 means no compression) rpc.compress.limit int default=1024 diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 86c987a15c5..cfd74bacf16 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -436,6 +436,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); rpc::StorageApiRpcService::Params rpc_params; rpc_params.compression_config = convert_to_rpc_compression_config(*config); + rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode; _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>( *this, *_shared_rpc_resources, *_message_codec_provider, rpc_params); diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt index 21498d66781..7e2f4c9c82c 100644 --- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt @@ -16,7 +16,7 @@ vespa_add_library(storage_storageserver_rpc OBJECT caching_rpc_target_resolver.cpp cluster_controller_api_rpc_service.cpp message_codec_provider.cpp - rpc_target.cpp + rpc_target_pool.cpp shared_rpc_resources.cpp slime_cluster_state_bundle_codec.cpp storage_api_rpc_service.cpp diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp index 6bcb154aed5..c497421f8f7 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp @@ -14,10 +14,12 @@ LOG_SETUP(".storage.caching_rpc_target_resolver"); namespace storage::rpc { CachingRpcTargetResolver::CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror, - const RpcTargetFactory& target_factory) + const RpcTargetFactory& target_factory, + size_t num_targets_per_node) : _slobrok_mirror(slobrok_mirror), _target_factory(target_factory), - _targets_rwmutex() + _targets_rwmutex(), + _num_targets_per_node(num_targets_per_node) { } @@ -33,56 +35,67 @@ CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint32_t curr_slobrok_gen) { +CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint64_t bucket_id, uint32_t curr_slobrok_gen) { std::shared_lock lock(_targets_rwmutex); auto itr = _targets.find(address); - if ((itr != _targets.end()) - && itr->second->_target->is_valid() - && (itr->second->_slobrok_gen == curr_slobrok_gen)) { - return itr->second; + if (itr != _targets.end()) { + const auto& pool = itr->second; + auto target = pool->get_target(bucket_id); + if (target->is_valid() && (pool->slobrok_gen() == curr_slobrok_gen)) { + return target; + } } return {}; } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::consider_update_target(const api::StorageMessageAddress& address, - const vespalib::string& connection_spec, - uint32_t curr_slobrok_gen, - [[maybe_unused]] const UniqueLock& targets_lock) { - // If address has the same spec as the existing target, just reuse it. +CachingRpcTargetResolver::consider_update_target_pool(const api::StorageMessageAddress& address, + uint64_t bucket_id, + const vespalib::string& connection_spec, + uint32_t curr_slobrok_gen, + [[maybe_unused]] const UniqueLock& targets_lock) { + // If address has the same spec as the existing target pool, just reuse it. auto itr = _targets.find(address); - if ((itr != _targets.end()) - && (itr->second->_target->is_valid()) - && (itr->second->_spec == connection_spec)) - { - LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u", - address.toString().c_str(), connection_spec.c_str(), - itr->second->_slobrok_gen, curr_slobrok_gen); - itr->second->_slobrok_gen = curr_slobrok_gen; - return itr->second; + if (itr != _targets.end()) { + auto& pool = itr->second; + auto target = pool->get_target(bucket_id); + if (target->is_valid() && (pool->spec() == connection_spec)) { + LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u", + address.toString().c_str(), connection_spec.c_str(), + pool->slobrok_gen(), curr_slobrok_gen); + pool->update_slobrok_gen(curr_slobrok_gen); + return target; + } } return {}; } std::shared_ptr<RpcTarget> CachingRpcTargetResolver::insert_new_target_mapping(const api::StorageMessageAddress& address, + uint64_t bucket_id, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, [[maybe_unused]] const UniqueLock& targets_lock) { - auto target = _target_factory.make_target(connection_spec, curr_slobrok_gen); // TODO expensive inside lock? - assert(target); - std::shared_ptr<RpcTarget> rpc_target(std::move(target)); + RpcTargetPool::RpcTargetVector targets; + targets.reserve(_num_targets_per_node); + for (size_t i = 0; i < _num_targets_per_node; ++i) { + auto target = _target_factory.make_target(connection_spec); // TODO expensive inside lock? + assert(target); + targets.push_back(std::shared_ptr<RpcTarget>(std::move(target))); + } // TODO emplacement (with replace) semantics to avoid need for default constructed K/V - _targets[address] = rpc_target; + auto pool = std::make_shared<RpcTargetPool>(std::move(targets), connection_spec, curr_slobrok_gen); + _targets[address] = pool; LOG(debug, "Added mapping '%s' -> '%s' at gen %u", address.toString().c_str(), connection_spec.c_str(), curr_slobrok_gen); - return rpc_target; + return pool->get_target(bucket_id); } std::shared_ptr<RpcTarget> -CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) { +CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address, + uint64_t bucket_id) { const uint32_t curr_slobrok_gen = _slobrok_mirror.updates(); - if (auto result = lookup_target(address, curr_slobrok_gen)) { + if (auto result = lookup_target(address, bucket_id, curr_slobrok_gen)) { return result; } auto slobrok_id = address_to_slobrok_id(address); @@ -97,10 +110,20 @@ CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& a assert(specs.size() == 1); const auto& connection_spec = specs[0].second; std::unique_lock lock(_targets_rwmutex); - if (auto result = consider_update_target(address, connection_spec, curr_slobrok_gen, lock)) { + if (auto result = consider_update_target_pool(address, bucket_id, connection_spec, curr_slobrok_gen, lock)) { return result; } - return insert_new_target_mapping(address, connection_spec, curr_slobrok_gen, lock); + return insert_new_target_mapping(address, bucket_id, connection_spec, curr_slobrok_gen, lock); +} + +std::shared_ptr<RpcTargetPool> +CachingRpcTargetResolver::resolve_rpc_target_pool(const api::StorageMessageAddress& address) { + std::shared_lock lock(_targets_rwmutex); + auto itr = _targets.find(address); + if (itr != _targets.end()) { + return itr->second; + } + return {}; } } diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h index 52b505d5476..1b614488b2d 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h @@ -3,6 +3,7 @@ #include "rpc_target.h" #include "rpc_target_factory.h" +#include "rpc_target_pool.h" #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/vespalib/stllike/hash_map.h> #include <memory> @@ -24,7 +25,7 @@ class CachingRpcTargetResolver { } }; using TargetHashMap = vespalib::hash_map<api::StorageMessageAddress, - std::shared_ptr<RpcTarget>, + std::shared_ptr<RpcTargetPool>, AddressInternalHasher>; using UniqueLock = std::unique_lock<std::shared_mutex>; @@ -32,27 +33,36 @@ class CachingRpcTargetResolver { const RpcTargetFactory& _target_factory; mutable std::shared_mutex _targets_rwmutex; TargetHashMap _targets; // TODO LRU? Size cap? + size_t _num_targets_per_node; std::shared_ptr<RpcTarget> lookup_target(const api::StorageMessageAddress& address, + uint64_t bucket_id, uint32_t curr_slobrok_gen); - std::shared_ptr<RpcTarget> consider_update_target(const api::StorageMessageAddress& address, - const vespalib::string& connection_spec, - uint32_t curr_slobrok_gen, - const UniqueLock& targets_lock); + std::shared_ptr<RpcTarget> consider_update_target_pool(const api::StorageMessageAddress& address, + uint64_t bucket_id, + const vespalib::string& connection_spec, + uint32_t curr_slobrok_gen, + const UniqueLock& targets_lock); std::shared_ptr<RpcTarget> insert_new_target_mapping(const api::StorageMessageAddress& address, + uint64_t bucket_id, const vespalib::string& connection_spec, uint32_t curr_slobrok_gen, const UniqueLock& targets_lock); public: CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror, - const RpcTargetFactory& target_factory); + const RpcTargetFactory& target_factory, + size_t num_targets_per_node); ~CachingRpcTargetResolver(); static vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address); - std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address); + std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address, + uint64_t bucket_id); + + // Should only be used for unit testing + std::shared_ptr<RpcTargetPool> resolve_rpc_target_pool(const api::StorageMessageAddress& address); }; } // storage::rpc diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp deleted file mode 100644 index 5fcce1e2a6d..00000000000 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "rpc_target.h" - -namespace storage::rpc { - -RpcTarget::RpcTarget(std::unique_ptr<WrappedFrtTarget> target, vespalib::stringref spec, uint32_t slobrok_gen) - : _target(std::move(target)), - _spec(spec), - _slobrok_gen(slobrok_gen) -{} - -RpcTarget::~RpcTarget() = default; - -} diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h index 0f2dec4269c..af93e7101ca 100644 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h @@ -2,7 +2,6 @@ #pragma once #include <vespa/vespalib/stllike/string.h> -#include <cstdint> class FRT_Target; @@ -11,26 +10,12 @@ namespace storage::rpc { /** * Simple wrapper API to access a FRT_Target. */ -class WrappedFrtTarget { +class RpcTarget { public: - virtual ~WrappedFrtTarget() = default; + virtual ~RpcTarget() = default; virtual FRT_Target* get() noexcept = 0; virtual bool is_valid() const noexcept = 0; -}; - -struct RpcTarget { - std::unique_ptr<WrappedFrtTarget> _target; - const vespalib::string _spec; - uint32_t _slobrok_gen; - - RpcTarget(std::unique_ptr<WrappedFrtTarget> target, - vespalib::stringref spec, - uint32_t slobrok_gen); - RpcTarget(const RpcTarget&) = delete; - RpcTarget& operator=(const RpcTarget&) = delete; - RpcTarget(RpcTarget&&) = delete; - RpcTarget& operator=(RpcTarget&&) = delete; - ~RpcTarget(); + virtual const vespalib::string& spec() const noexcept = 0; }; } diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h index ef438e96305..14c356652e1 100644 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h @@ -6,7 +6,7 @@ namespace storage::rpc { -struct RpcTarget; +class RpcTarget; /** * Factory for creating instances of RpcTarget based on a connection spec. @@ -14,7 +14,7 @@ struct RpcTarget; class RpcTargetFactory { public: virtual ~RpcTargetFactory() = default; - virtual std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const = 0; + virtual std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec) const = 0; }; } diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp new file mode 100644 index 00000000000..d8671ee3783 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp @@ -0,0 +1,21 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "rpc_target.h" +#include "rpc_target_pool.h" + +namespace storage::rpc { + +RpcTargetPool::RpcTargetPool(RpcTargetVector&& targets, const vespalib::string& spec, uint32_t slobrok_gen) + : _targets(std::move(targets)), + _spec(spec), + _slobrok_gen(slobrok_gen) +{ +} + +std::shared_ptr<RpcTarget> +RpcTargetPool::get_target(uint64_t bucket_id) const +{ + return _targets[bucket_id % _targets.size()]; +} + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h new file mode 100644 index 00000000000..675341e4a1d --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h @@ -0,0 +1,37 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <cstdint> +#include <vector> + +namespace storage::rpc { + +class RpcTarget; + +/** + * A pool of RPC targets used for a single node endpoint. + * + * The bucket id associated with a message is used to select the RPC target. + * This ensures the same RPC target is used for all messages to the same bucket to the same node, + * and the RPC target itself handles sequencing of these messages. + */ +class RpcTargetPool { +public: + using RpcTargetVector = std::vector<std::shared_ptr<RpcTarget>>; + +private: + RpcTargetVector _targets; + const vespalib::string _spec; + uint32_t _slobrok_gen; + +public: + RpcTargetPool(RpcTargetVector&& targets, const vespalib::string& spec, uint32_t slobrok_gen); + const vespalib::string& spec() const { return _spec; } + uint32_t slobrok_gen() const { return _slobrok_gen; } + void update_slobrok_gen(uint32_t curr_slobrok_gen) { _slobrok_gen = curr_slobrok_gen; } + std::shared_ptr<RpcTarget> get_target(uint64_t bucket_id) const; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index d21f32aa623..8896df7a155 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -23,19 +23,22 @@ namespace storage::rpc { namespace { -class WrappedFrtTargetImpl : public WrappedFrtTarget { +class RpcTargetImpl : public RpcTarget { private: FRT_Target* _target; + vespalib::string _spec; public: - WrappedFrtTargetImpl(FRT_Target* target) - : _target(target) + RpcTargetImpl(FRT_Target* target, const vespalib::string& spec) + : _target(target), + _spec(spec) {} - ~WrappedFrtTargetImpl() override { + ~RpcTargetImpl() override { _target->SubRef(); } FRT_Target* get() noexcept override { return _target; } bool is_valid() const noexcept override { return _target->IsValid(); } + const vespalib::string& spec() const noexcept override { return _spec; } }; } @@ -48,11 +51,10 @@ public: RpcTargetFactoryImpl(FRT_Supervisor& orb) : _orb(orb) {} - std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override { + std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec) const override { auto* raw_target = _orb.GetTarget(connection_spec.c_str()); if (raw_target) { - return std::make_unique<RpcTarget> - (std::make_unique<WrappedFrtTargetImpl>(raw_target), connection_spec, slobrok_gen); + return std::make_unique<RpcTargetImpl>(raw_target, connection_spec); } return std::unique_ptr<RpcTarget>(); } diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index 8b5c7706510..42b621ea10f 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -35,7 +35,8 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher _rpc_resources(rpc_resources), _message_codec_provider(message_codec_provider), _params(params), - _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory())), + _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory(), + params.num_rpc_targets_per_node)), _direct_rpc_supported(true) { register_server_methods(rpc_resources); @@ -43,7 +44,10 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher StorageApiRpcService::~StorageApiRpcService() = default; -StorageApiRpcService::Params::Params() = default; +StorageApiRpcService::Params::Params() + : compression_config(), + num_rpc_targets_per_node(1) +{} StorageApiRpcService::Params::~Params() = default; @@ -216,7 +220,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma cmd->getType().getName().c_str(), cmd->getAddress()->toString().c_str()); assert(cmd->getAddress() != nullptr); - auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress()); + auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress(), cmd->getBucketId().getId()); if (!target) { auto reply = cmd->makeReply(); reply->setResult(make_no_address_for_service_error(*cmd->getAddress())); @@ -233,7 +237,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds", _rpc_resources.handle().c_str(), CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(), - target->_spec.c_str(), vespalib::to_s(cmd->getTimeout()))); + target->spec().c_str(), vespalib::to_s(cmd->getTimeout()))); } std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest()); req->SetMethodName(rpc_v1_method_name()); @@ -251,7 +255,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd)); req->SetContext(FNET_Context(&req_ctx)); - target->_target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this); + target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this); } void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index cb2344ccd13..3ccd235fc43 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -37,6 +37,7 @@ class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait { public: struct Params { vespalib::compression::CompressionConfig compression_config; + size_t num_rpc_targets_per_node; Params(); ~Params(); |