aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-10-02 12:40:10 +0000
committerGeir Storli <geirst@verizonmedia.com>2020-10-02 13:00:53 +0000
commit9f41290e7d90b7f100c6c7bed478873ee73924ef (patch)
treeca69d4f53369e12be3402bd3c155b40cd1d55702
parent188521786cb0d181acf31122ba3803572f86c322 (diff)
Add support for multiple RPC targets per node when using Storage API over RPC.
This should allow for better parallelization and higher feed throughput. The bucket id associated with a message is used to select the RPC target. This ensures the same RPC target is used for all messages to the same bucket to the same node, and the RPC target itself handles sequencing of these messages.
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp8
-rw-r--r--storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp70
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def7
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp83
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h24
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.h21
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp21
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h37
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp16
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h1
15 files changed, 210 insertions, 113 deletions
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
index 324f98625f3..15fbb2e2344 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/bm_cluster_controller.cpp
@@ -44,9 +44,11 @@ BmClusterController::set_cluster_up(bool distributor)
{
StorageMessageAddress storage_address("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0);
auto req = make_set_cluster_state_request();
- auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(), _shared_rpc_resources.target_factory());
- auto target = target_resolver->resolve_rpc_target(storage_address);
- target->_target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
+ auto target_resolver = std::make_unique<storage::rpc::CachingRpcTargetResolver>(_shared_rpc_resources.slobrok_mirror(),
+ _shared_rpc_resources.target_factory(), 1);
+ uint64_t fake_bucket_id = 0;
+ auto target = target_resolver->resolve_rpc_target(storage_address, fake_bucket_id);
+ target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
assert(!req->IsError());
req->SubRef();
}
diff --git a/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
index 9a2eb4d5c64..6291a8ad0dd 100644
--- a/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
+++ b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
@@ -29,13 +29,14 @@ public:
void inc_gen() { ++gen; }
};
-class MockWrappedFrtTarget : public WrappedFrtTarget {
+class MockRpcTarget : public RpcTarget {
private:
bool& _valid;
public:
- MockWrappedFrtTarget(bool& valid) : _valid(valid) {}
+ MockRpcTarget(bool& valid) : _valid(valid) {}
FRT_Target* get() noexcept override { return nullptr; }
bool is_valid() const noexcept override { return _valid; }
+ const vespalib::string& spec() const noexcept override { abort(); }
};
class MockTargetFactory : public RpcTargetFactory {
@@ -43,9 +44,8 @@ public:
mutable bool valid_target;
MockTargetFactory() : valid_target(true) {}
- std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override {
- return std::make_unique<RpcTarget>(std::make_unique<MockWrappedFrtTarget>(valid_target),
- connection_spec, slobrok_gen);
+ std::unique_ptr<RpcTarget> make_target([[maybe_unused]] const vespalib::string& connection_spec) const override {
+ return std::make_unique<MockRpcTarget>(valid_target);
}
};
@@ -58,15 +58,21 @@ public:
StorageMessageAddress address_1;
vespalib::string spec_0;
vespalib::string spec_1;
+ uint64_t bucket_id_0;
+ uint64_t bucket_id_1;
+ uint64_t bucket_id_2;
CachingRpcTargetResolverTest()
: mirror(),
factory(),
- resolver(mirror, factory),
+ resolver(mirror, factory, 2),
address_0("my_cluster", NodeType::STORAGE, 5),
address_1("my_cluster", NodeType::DISTRIBUTOR, 7),
spec_0("tcp/my:41"),
- spec_1("tcp/my:42")
+ spec_1("tcp/my:42"),
+ bucket_id_0(3),
+ bucket_id_1(4),
+ bucket_id_2(5)
{
add_mapping(address_0, spec_0);
}
@@ -76,6 +82,9 @@ public:
static vespalib::string to_slobrok_id(const storage::api::StorageMessageAddress& address) {
return CachingRpcTargetResolver::address_to_slobrok_id(address);
}
+ std::shared_ptr<RpcTarget> resolve_rpc_target(const StorageMessageAddress& address) {
+ return resolver.resolve_rpc_target(address, bucket_id_0);
+ }
};
TEST_F(CachingRpcTargetResolverTest, converts_storage_message_address_to_slobrok_id)
@@ -86,49 +95,58 @@ TEST_F(CachingRpcTargetResolverTest, converts_storage_message_address_to_slobrok
TEST_F(CachingRpcTargetResolverTest, resolves_rpc_target_and_caches_result)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
ASSERT_TRUE(target_a);
- EXPECT_EQ(spec_0, target_a->_spec);
- EXPECT_EQ(1, target_a->_slobrok_gen);
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
ASSERT_TRUE(target_b);
EXPECT_EQ(target_a.get(), target_b.get());
- EXPECT_EQ(spec_0, target_b->_spec);
- EXPECT_EQ(1, target_b->_slobrok_gen);
}
-TEST_F(CachingRpcTargetResolverTest, cached_rpc_target_is_updated_when_slobrok_generation_changes)
+TEST_F(CachingRpcTargetResolverTest, rpc_target_pool_is_updated_when_slobrok_generation_changes)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
mirror.inc_gen();
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
EXPECT_EQ(target_a.get(), target_b.get());
- EXPECT_EQ(2, target_b->_slobrok_gen);
+ auto pool = resolver.resolve_rpc_target_pool(address_0);
+ EXPECT_EQ(2, pool->slobrok_gen());
}
TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_connection_spec_changes)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
add_mapping(address_0, spec_1);
mirror.inc_gen();
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
EXPECT_NE(target_a.get(), target_b.get());
- EXPECT_EQ(spec_1, target_b->_spec);
- EXPECT_EQ(2, target_b->_slobrok_gen);
+ auto pool = resolver.resolve_rpc_target_pool(address_0);
+ EXPECT_EQ(spec_1, pool->spec());
+ EXPECT_EQ(2, pool->slobrok_gen());
}
TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_raw_target_is_invalid)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
factory.valid_target = false;
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
EXPECT_NE(target_a.get(), target_b.get());
- EXPECT_EQ(spec_0, target_b->_spec);
- EXPECT_EQ(1, target_b->_slobrok_gen);
}
TEST_F(CachingRpcTargetResolverTest, null_rpc_target_is_returned_if_slobrok_id_is_not_found)
{
- auto target = resolver.resolve_rpc_target(address_1);
+ auto target = resolve_rpc_target(address_1);
EXPECT_FALSE(target);
}
+
+TEST_F(CachingRpcTargetResolverTest, bucket_id_is_used_to_select_target)
+{
+ auto target_a = resolver.resolve_rpc_target(address_0, bucket_id_0);
+ auto target_b = resolver.resolve_rpc_target(address_0, bucket_id_0);
+ auto target_c = resolver.resolve_rpc_target(address_0, bucket_id_2);
+ auto target_d = resolver.resolve_rpc_target(address_0, bucket_id_1);
+ auto target_e = resolver.resolve_rpc_target(address_0, bucket_id_1);
+ EXPECT_EQ(target_a.get(), target_b.get());
+ EXPECT_EQ(target_a.get(), target_c.get());
+ EXPECT_EQ(target_d.get(), target_e.get());
+ EXPECT_NE(target_a.get(), target_d.get());
+}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 3babc98cbb1..cabb8516153 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -63,6 +63,13 @@ use_direct_storageapi_rpc bool default=false
## The number of network (FNET) threads used by the shared rpc resource.
rpc.num_network_threads int default=1
+## The number of (FNET) RPC targets to use per node in the cluster.
+##
+## The bucket id associated with a message is used to select the RPC target.
+## This ensures the same RPC target is used for all messages to the same bucket to the same node,
+## and the RPC target itself handles sequencing of these messages.
+rpc.num_targets_per_node int default=1
+
# Minimum size of packets to compress (0 means no compression)
rpc.compress.limit int default=1024
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 86c987a15c5..cfd74bacf16 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -436,6 +436,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
rpc::StorageApiRpcService::Params rpc_params;
rpc_params.compression_config = convert_to_rpc_compression_config(*config);
+ rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode;
_storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(
*this, *_shared_rpc_resources, *_message_codec_provider, rpc_params);
diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
index 21498d66781..7e2f4c9c82c 100644
--- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
@@ -16,7 +16,7 @@ vespa_add_library(storage_storageserver_rpc OBJECT
caching_rpc_target_resolver.cpp
cluster_controller_api_rpc_service.cpp
message_codec_provider.cpp
- rpc_target.cpp
+ rpc_target_pool.cpp
shared_rpc_resources.cpp
slime_cluster_state_bundle_codec.cpp
storage_api_rpc_service.cpp
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
index 6bcb154aed5..c497421f8f7 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
@@ -14,10 +14,12 @@ LOG_SETUP(".storage.caching_rpc_target_resolver");
namespace storage::rpc {
CachingRpcTargetResolver::CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror,
- const RpcTargetFactory& target_factory)
+ const RpcTargetFactory& target_factory,
+ size_t num_targets_per_node)
: _slobrok_mirror(slobrok_mirror),
_target_factory(target_factory),
- _targets_rwmutex()
+ _targets_rwmutex(),
+ _num_targets_per_node(num_targets_per_node)
{
}
@@ -33,56 +35,67 @@ CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint32_t curr_slobrok_gen) {
+CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint64_t bucket_id, uint32_t curr_slobrok_gen) {
std::shared_lock lock(_targets_rwmutex);
auto itr = _targets.find(address);
- if ((itr != _targets.end())
- && itr->second->_target->is_valid()
- && (itr->second->_slobrok_gen == curr_slobrok_gen)) {
- return itr->second;
+ if (itr != _targets.end()) {
+ const auto& pool = itr->second;
+ auto target = pool->get_target(bucket_id);
+ if (target->is_valid() && (pool->slobrok_gen() == curr_slobrok_gen)) {
+ return target;
+ }
}
return {};
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::consider_update_target(const api::StorageMessageAddress& address,
- const vespalib::string& connection_spec,
- uint32_t curr_slobrok_gen,
- [[maybe_unused]] const UniqueLock& targets_lock) {
- // If address has the same spec as the existing target, just reuse it.
+CachingRpcTargetResolver::consider_update_target_pool(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ [[maybe_unused]] const UniqueLock& targets_lock) {
+ // If address has the same spec as the existing target pool, just reuse it.
auto itr = _targets.find(address);
- if ((itr != _targets.end())
- && (itr->second->_target->is_valid())
- && (itr->second->_spec == connection_spec))
- {
- LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u",
- address.toString().c_str(), connection_spec.c_str(),
- itr->second->_slobrok_gen, curr_slobrok_gen);
- itr->second->_slobrok_gen = curr_slobrok_gen;
- return itr->second;
+ if (itr != _targets.end()) {
+ auto& pool = itr->second;
+ auto target = pool->get_target(bucket_id);
+ if (target->is_valid() && (pool->spec() == connection_spec)) {
+ LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u",
+ address.toString().c_str(), connection_spec.c_str(),
+ pool->slobrok_gen(), curr_slobrok_gen);
+ pool->update_slobrok_gen(curr_slobrok_gen);
+ return target;
+ }
}
return {};
}
std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::insert_new_target_mapping(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
[[maybe_unused]] const UniqueLock& targets_lock) {
- auto target = _target_factory.make_target(connection_spec, curr_slobrok_gen); // TODO expensive inside lock?
- assert(target);
- std::shared_ptr<RpcTarget> rpc_target(std::move(target));
+ RpcTargetPool::RpcTargetVector targets;
+ targets.reserve(_num_targets_per_node);
+ for (size_t i = 0; i < _num_targets_per_node; ++i) {
+ auto target = _target_factory.make_target(connection_spec); // TODO expensive inside lock?
+ assert(target);
+ targets.push_back(std::shared_ptr<RpcTarget>(std::move(target)));
+ }
// TODO emplacement (with replace) semantics to avoid need for default constructed K/V
- _targets[address] = rpc_target;
+ auto pool = std::make_shared<RpcTargetPool>(std::move(targets), connection_spec, curr_slobrok_gen);
+ _targets[address] = pool;
LOG(debug, "Added mapping '%s' -> '%s' at gen %u", address.toString().c_str(),
connection_spec.c_str(), curr_slobrok_gen);
- return rpc_target;
+ return pool->get_target(bucket_id);
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) {
+CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address,
+ uint64_t bucket_id) {
const uint32_t curr_slobrok_gen = _slobrok_mirror.updates();
- if (auto result = lookup_target(address, curr_slobrok_gen)) {
+ if (auto result = lookup_target(address, bucket_id, curr_slobrok_gen)) {
return result;
}
auto slobrok_id = address_to_slobrok_id(address);
@@ -97,10 +110,20 @@ CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& a
assert(specs.size() == 1);
const auto& connection_spec = specs[0].second;
std::unique_lock lock(_targets_rwmutex);
- if (auto result = consider_update_target(address, connection_spec, curr_slobrok_gen, lock)) {
+ if (auto result = consider_update_target_pool(address, bucket_id, connection_spec, curr_slobrok_gen, lock)) {
return result;
}
- return insert_new_target_mapping(address, connection_spec, curr_slobrok_gen, lock);
+ return insert_new_target_mapping(address, bucket_id, connection_spec, curr_slobrok_gen, lock);
+}
+
+std::shared_ptr<RpcTargetPool>
+CachingRpcTargetResolver::resolve_rpc_target_pool(const api::StorageMessageAddress& address) {
+ std::shared_lock lock(_targets_rwmutex);
+ auto itr = _targets.find(address);
+ if (itr != _targets.end()) {
+ return itr->second;
+ }
+ return {};
}
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
index 52b505d5476..1b614488b2d 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
@@ -3,6 +3,7 @@
#include "rpc_target.h"
#include "rpc_target_factory.h"
+#include "rpc_target_pool.h"
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <memory>
@@ -24,7 +25,7 @@ class CachingRpcTargetResolver {
}
};
using TargetHashMap = vespalib::hash_map<api::StorageMessageAddress,
- std::shared_ptr<RpcTarget>,
+ std::shared_ptr<RpcTargetPool>,
AddressInternalHasher>;
using UniqueLock = std::unique_lock<std::shared_mutex>;
@@ -32,27 +33,36 @@ class CachingRpcTargetResolver {
const RpcTargetFactory& _target_factory;
mutable std::shared_mutex _targets_rwmutex;
TargetHashMap _targets; // TODO LRU? Size cap?
+ size_t _num_targets_per_node;
std::shared_ptr<RpcTarget> lookup_target(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
uint32_t curr_slobrok_gen);
- std::shared_ptr<RpcTarget> consider_update_target(const api::StorageMessageAddress& address,
- const vespalib::string& connection_spec,
- uint32_t curr_slobrok_gen,
- const UniqueLock& targets_lock);
+ std::shared_ptr<RpcTarget> consider_update_target_pool(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ const UniqueLock& targets_lock);
std::shared_ptr<RpcTarget> insert_new_target_mapping(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
const UniqueLock& targets_lock);
public:
CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror,
- const RpcTargetFactory& target_factory);
+ const RpcTargetFactory& target_factory,
+ size_t num_targets_per_node);
~CachingRpcTargetResolver();
static vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address);
- std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address);
+ std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address,
+ uint64_t bucket_id);
+
+ // Should only be used for unit testing
+ std::shared_ptr<RpcTargetPool> resolve_rpc_target_pool(const api::StorageMessageAddress& address);
};
} // storage::rpc
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
deleted file mode 100644
index 5fcce1e2a6d..00000000000
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "rpc_target.h"
-
-namespace storage::rpc {
-
-RpcTarget::RpcTarget(std::unique_ptr<WrappedFrtTarget> target, vespalib::stringref spec, uint32_t slobrok_gen)
- : _target(std::move(target)),
- _spec(spec),
- _slobrok_gen(slobrok_gen)
-{}
-
-RpcTarget::~RpcTarget() = default;
-
-}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
index 0f2dec4269c..af93e7101ca 100644
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
@@ -2,7 +2,6 @@
#pragma once
#include <vespa/vespalib/stllike/string.h>
-#include <cstdint>
class FRT_Target;
@@ -11,26 +10,12 @@ namespace storage::rpc {
/**
* Simple wrapper API to access a FRT_Target.
*/
-class WrappedFrtTarget {
+class RpcTarget {
public:
- virtual ~WrappedFrtTarget() = default;
+ virtual ~RpcTarget() = default;
virtual FRT_Target* get() noexcept = 0;
virtual bool is_valid() const noexcept = 0;
-};
-
-struct RpcTarget {
- std::unique_ptr<WrappedFrtTarget> _target;
- const vespalib::string _spec;
- uint32_t _slobrok_gen;
-
- RpcTarget(std::unique_ptr<WrappedFrtTarget> target,
- vespalib::stringref spec,
- uint32_t slobrok_gen);
- RpcTarget(const RpcTarget&) = delete;
- RpcTarget& operator=(const RpcTarget&) = delete;
- RpcTarget(RpcTarget&&) = delete;
- RpcTarget& operator=(RpcTarget&&) = delete;
- ~RpcTarget();
+ virtual const vespalib::string& spec() const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
index ef438e96305..14c356652e1 100644
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
@@ -6,7 +6,7 @@
namespace storage::rpc {
-struct RpcTarget;
+class RpcTarget;
/**
* Factory for creating instances of RpcTarget based on a connection spec.
@@ -14,7 +14,7 @@ struct RpcTarget;
class RpcTargetFactory {
public:
virtual ~RpcTargetFactory() = default;
- virtual std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const = 0;
+ virtual std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec) const = 0;
};
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp
new file mode 100644
index 00000000000..d8671ee3783
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "rpc_target.h"
+#include "rpc_target_pool.h"
+
+namespace storage::rpc {
+
+RpcTargetPool::RpcTargetPool(RpcTargetVector&& targets, const vespalib::string& spec, uint32_t slobrok_gen)
+ : _targets(std::move(targets)),
+ _spec(spec),
+ _slobrok_gen(slobrok_gen)
+{
+}
+
+std::shared_ptr<RpcTarget>
+RpcTargetPool::get_target(uint64_t bucket_id) const
+{
+ return _targets[bucket_id % _targets.size()];
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h
new file mode 100644
index 00000000000..675341e4a1d
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h
@@ -0,0 +1,37 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <cstdint>
+#include <vector>
+
+namespace storage::rpc {
+
+class RpcTarget;
+
+/**
+ * A pool of RPC targets used for a single node endpoint.
+ *
+ * The bucket id associated with a message is used to select the RPC target.
+ * This ensures the same RPC target is used for all messages to the same bucket to the same node,
+ * and the RPC target itself handles sequencing of these messages.
+ */
+class RpcTargetPool {
+public:
+ using RpcTargetVector = std::vector<std::shared_ptr<RpcTarget>>;
+
+private:
+ RpcTargetVector _targets;
+ const vespalib::string _spec;
+ uint32_t _slobrok_gen;
+
+public:
+ RpcTargetPool(RpcTargetVector&& targets, const vespalib::string& spec, uint32_t slobrok_gen);
+ const vespalib::string& spec() const { return _spec; }
+ uint32_t slobrok_gen() const { return _slobrok_gen; }
+ void update_slobrok_gen(uint32_t curr_slobrok_gen) { _slobrok_gen = curr_slobrok_gen; }
+ std::shared_ptr<RpcTarget> get_target(uint64_t bucket_id) const;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index d21f32aa623..8896df7a155 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -23,19 +23,22 @@ namespace storage::rpc {
namespace {
-class WrappedFrtTargetImpl : public WrappedFrtTarget {
+class RpcTargetImpl : public RpcTarget {
private:
FRT_Target* _target;
+ vespalib::string _spec;
public:
- WrappedFrtTargetImpl(FRT_Target* target)
- : _target(target)
+ RpcTargetImpl(FRT_Target* target, const vespalib::string& spec)
+ : _target(target),
+ _spec(spec)
{}
- ~WrappedFrtTargetImpl() override {
+ ~RpcTargetImpl() override {
_target->SubRef();
}
FRT_Target* get() noexcept override { return _target; }
bool is_valid() const noexcept override { return _target->IsValid(); }
+ const vespalib::string& spec() const noexcept override { return _spec; }
};
}
@@ -48,11 +51,10 @@ public:
RpcTargetFactoryImpl(FRT_Supervisor& orb)
: _orb(orb)
{}
- std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override {
+ std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec) const override {
auto* raw_target = _orb.GetTarget(connection_spec.c_str());
if (raw_target) {
- return std::make_unique<RpcTarget>
- (std::make_unique<WrappedFrtTargetImpl>(raw_target), connection_spec, slobrok_gen);
+ return std::make_unique<RpcTargetImpl>(raw_target, connection_spec);
}
return std::unique_ptr<RpcTarget>();
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 8b5c7706510..42b621ea10f 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -35,7 +35,8 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
_rpc_resources(rpc_resources),
_message_codec_provider(message_codec_provider),
_params(params),
- _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory())),
+ _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory(),
+ params.num_rpc_targets_per_node)),
_direct_rpc_supported(true)
{
register_server_methods(rpc_resources);
@@ -43,7 +44,10 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
StorageApiRpcService::~StorageApiRpcService() = default;
-StorageApiRpcService::Params::Params() = default;
+StorageApiRpcService::Params::Params()
+ : compression_config(),
+ num_rpc_targets_per_node(1)
+{}
StorageApiRpcService::Params::~Params() = default;
@@ -216,7 +220,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
cmd->getType().getName().c_str(), cmd->getAddress()->toString().c_str());
assert(cmd->getAddress() != nullptr);
- auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress());
+ auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress(), cmd->getBucketId().getId());
if (!target) {
auto reply = cmd->makeReply();
reply->setResult(make_no_address_for_service_error(*cmd->getAddress()));
@@ -233,7 +237,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds",
_rpc_resources.handle().c_str(),
CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(),
- target->_spec.c_str(), vespalib::to_s(cmd->getTimeout())));
+ target->spec().c_str(), vespalib::to_s(cmd->getTimeout())));
}
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
req->SetMethodName(rpc_v1_method_name());
@@ -251,7 +255,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd));
req->SetContext(FNET_Context(&req_ctx));
- target->_target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
+ target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
}
void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index cb2344ccd13..3ccd235fc43 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -37,6 +37,7 @@ class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait {
public:
struct Params {
vespalib::compression::CompressionConfig compression_config;
+ size_t num_rpc_targets_per_node;
Params();
~Params();