aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp70
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def7
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp83
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h24
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.h21
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp21
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h37
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp16
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h1
14 files changed, 205 insertions, 110 deletions
diff --git a/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
index 9a2eb4d5c64..6291a8ad0dd 100644
--- a/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
+++ b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
@@ -29,13 +29,14 @@ public:
void inc_gen() { ++gen; }
};
-class MockWrappedFrtTarget : public WrappedFrtTarget {
+class MockRpcTarget : public RpcTarget {
private:
bool& _valid;
public:
- MockWrappedFrtTarget(bool& valid) : _valid(valid) {}
+ MockRpcTarget(bool& valid) : _valid(valid) {}
FRT_Target* get() noexcept override { return nullptr; }
bool is_valid() const noexcept override { return _valid; }
+ const vespalib::string& spec() const noexcept override { abort(); }
};
class MockTargetFactory : public RpcTargetFactory {
@@ -43,9 +44,8 @@ public:
mutable bool valid_target;
MockTargetFactory() : valid_target(true) {}
- std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override {
- return std::make_unique<RpcTarget>(std::make_unique<MockWrappedFrtTarget>(valid_target),
- connection_spec, slobrok_gen);
+ std::unique_ptr<RpcTarget> make_target([[maybe_unused]] const vespalib::string& connection_spec) const override {
+ return std::make_unique<MockRpcTarget>(valid_target);
}
};
@@ -58,15 +58,21 @@ public:
StorageMessageAddress address_1;
vespalib::string spec_0;
vespalib::string spec_1;
+ uint64_t bucket_id_0;
+ uint64_t bucket_id_1;
+ uint64_t bucket_id_2;
CachingRpcTargetResolverTest()
: mirror(),
factory(),
- resolver(mirror, factory),
+ resolver(mirror, factory, 2),
address_0("my_cluster", NodeType::STORAGE, 5),
address_1("my_cluster", NodeType::DISTRIBUTOR, 7),
spec_0("tcp/my:41"),
- spec_1("tcp/my:42")
+ spec_1("tcp/my:42"),
+ bucket_id_0(3),
+ bucket_id_1(4),
+ bucket_id_2(5)
{
add_mapping(address_0, spec_0);
}
@@ -76,6 +82,9 @@ public:
static vespalib::string to_slobrok_id(const storage::api::StorageMessageAddress& address) {
return CachingRpcTargetResolver::address_to_slobrok_id(address);
}
+ std::shared_ptr<RpcTarget> resolve_rpc_target(const StorageMessageAddress& address) {
+ return resolver.resolve_rpc_target(address, bucket_id_0);
+ }
};
TEST_F(CachingRpcTargetResolverTest, converts_storage_message_address_to_slobrok_id)
@@ -86,49 +95,58 @@ TEST_F(CachingRpcTargetResolverTest, converts_storage_message_address_to_slobrok
TEST_F(CachingRpcTargetResolverTest, resolves_rpc_target_and_caches_result)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
ASSERT_TRUE(target_a);
- EXPECT_EQ(spec_0, target_a->_spec);
- EXPECT_EQ(1, target_a->_slobrok_gen);
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
ASSERT_TRUE(target_b);
EXPECT_EQ(target_a.get(), target_b.get());
- EXPECT_EQ(spec_0, target_b->_spec);
- EXPECT_EQ(1, target_b->_slobrok_gen);
}
-TEST_F(CachingRpcTargetResolverTest, cached_rpc_target_is_updated_when_slobrok_generation_changes)
+TEST_F(CachingRpcTargetResolverTest, rpc_target_pool_is_updated_when_slobrok_generation_changes)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
mirror.inc_gen();
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
EXPECT_EQ(target_a.get(), target_b.get());
- EXPECT_EQ(2, target_b->_slobrok_gen);
+ auto pool = resolver.resolve_rpc_target_pool(address_0);
+ EXPECT_EQ(2, pool->slobrok_gen());
}
TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_connection_spec_changes)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
add_mapping(address_0, spec_1);
mirror.inc_gen();
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
EXPECT_NE(target_a.get(), target_b.get());
- EXPECT_EQ(spec_1, target_b->_spec);
- EXPECT_EQ(2, target_b->_slobrok_gen);
+ auto pool = resolver.resolve_rpc_target_pool(address_0);
+ EXPECT_EQ(spec_1, pool->spec());
+ EXPECT_EQ(2, pool->slobrok_gen());
}
TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_raw_target_is_invalid)
{
- auto target_a = resolver.resolve_rpc_target(address_0);
+ auto target_a = resolve_rpc_target(address_0);
factory.valid_target = false;
- auto target_b = resolver.resolve_rpc_target(address_0);
+ auto target_b = resolve_rpc_target(address_0);
EXPECT_NE(target_a.get(), target_b.get());
- EXPECT_EQ(spec_0, target_b->_spec);
- EXPECT_EQ(1, target_b->_slobrok_gen);
}
TEST_F(CachingRpcTargetResolverTest, null_rpc_target_is_returned_if_slobrok_id_is_not_found)
{
- auto target = resolver.resolve_rpc_target(address_1);
+ auto target = resolve_rpc_target(address_1);
EXPECT_FALSE(target);
}
+
+TEST_F(CachingRpcTargetResolverTest, bucket_id_is_used_to_select_target)
+{
+ auto target_a = resolver.resolve_rpc_target(address_0, bucket_id_0);
+ auto target_b = resolver.resolve_rpc_target(address_0, bucket_id_0);
+ auto target_c = resolver.resolve_rpc_target(address_0, bucket_id_2);
+ auto target_d = resolver.resolve_rpc_target(address_0, bucket_id_1);
+ auto target_e = resolver.resolve_rpc_target(address_0, bucket_id_1);
+ EXPECT_EQ(target_a.get(), target_b.get());
+ EXPECT_EQ(target_a.get(), target_c.get());
+ EXPECT_EQ(target_d.get(), target_e.get());
+ EXPECT_NE(target_a.get(), target_d.get());
+}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 3babc98cbb1..cabb8516153 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -63,6 +63,13 @@ use_direct_storageapi_rpc bool default=false
## The number of network (FNET) threads used by the shared rpc resource.
rpc.num_network_threads int default=1
+## The number of (FNET) RPC targets to use per node in the cluster.
+##
+## The bucket id associated with a message is used to select the RPC target.
+## This ensures the same RPC target is used for all messages to the same bucket to the same node,
+## and the RPC target itself handles sequencing of these messages.
+rpc.num_targets_per_node int default=1
+
# Minimum size of packets to compress (0 means no compression)
rpc.compress.limit int default=1024
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 86c987a15c5..cfd74bacf16 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -436,6 +436,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
rpc::StorageApiRpcService::Params rpc_params;
rpc_params.compression_config = convert_to_rpc_compression_config(*config);
+ rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode;
_storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(
*this, *_shared_rpc_resources, *_message_codec_provider, rpc_params);
diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
index 21498d66781..7e2f4c9c82c 100644
--- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
@@ -16,7 +16,7 @@ vespa_add_library(storage_storageserver_rpc OBJECT
caching_rpc_target_resolver.cpp
cluster_controller_api_rpc_service.cpp
message_codec_provider.cpp
- rpc_target.cpp
+ rpc_target_pool.cpp
shared_rpc_resources.cpp
slime_cluster_state_bundle_codec.cpp
storage_api_rpc_service.cpp
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
index 6bcb154aed5..c497421f8f7 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
@@ -14,10 +14,12 @@ LOG_SETUP(".storage.caching_rpc_target_resolver");
namespace storage::rpc {
CachingRpcTargetResolver::CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror,
- const RpcTargetFactory& target_factory)
+ const RpcTargetFactory& target_factory,
+ size_t num_targets_per_node)
: _slobrok_mirror(slobrok_mirror),
_target_factory(target_factory),
- _targets_rwmutex()
+ _targets_rwmutex(),
+ _num_targets_per_node(num_targets_per_node)
{
}
@@ -33,56 +35,67 @@ CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint32_t curr_slobrok_gen) {
+CachingRpcTargetResolver::lookup_target(const api::StorageMessageAddress& address, uint64_t bucket_id, uint32_t curr_slobrok_gen) {
std::shared_lock lock(_targets_rwmutex);
auto itr = _targets.find(address);
- if ((itr != _targets.end())
- && itr->second->_target->is_valid()
- && (itr->second->_slobrok_gen == curr_slobrok_gen)) {
- return itr->second;
+ if (itr != _targets.end()) {
+ const auto& pool = itr->second;
+ auto target = pool->get_target(bucket_id);
+ if (target->is_valid() && (pool->slobrok_gen() == curr_slobrok_gen)) {
+ return target;
+ }
}
return {};
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::consider_update_target(const api::StorageMessageAddress& address,
- const vespalib::string& connection_spec,
- uint32_t curr_slobrok_gen,
- [[maybe_unused]] const UniqueLock& targets_lock) {
- // If address has the same spec as the existing target, just reuse it.
+CachingRpcTargetResolver::consider_update_target_pool(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ [[maybe_unused]] const UniqueLock& targets_lock) {
+ // If address has the same spec as the existing target pool, just reuse it.
auto itr = _targets.find(address);
- if ((itr != _targets.end())
- && (itr->second->_target->is_valid())
- && (itr->second->_spec == connection_spec))
- {
- LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u",
- address.toString().c_str(), connection_spec.c_str(),
- itr->second->_slobrok_gen, curr_slobrok_gen);
- itr->second->_slobrok_gen = curr_slobrok_gen;
- return itr->second;
+ if (itr != _targets.end()) {
+ auto& pool = itr->second;
+ auto target = pool->get_target(bucket_id);
+ if (target->is_valid() && (pool->spec() == connection_spec)) {
+ LOG(debug, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u",
+ address.toString().c_str(), connection_spec.c_str(),
+ pool->slobrok_gen(), curr_slobrok_gen);
+ pool->update_slobrok_gen(curr_slobrok_gen);
+ return target;
+ }
}
return {};
}
std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::insert_new_target_mapping(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
[[maybe_unused]] const UniqueLock& targets_lock) {
- auto target = _target_factory.make_target(connection_spec, curr_slobrok_gen); // TODO expensive inside lock?
- assert(target);
- std::shared_ptr<RpcTarget> rpc_target(std::move(target));
+ RpcTargetPool::RpcTargetVector targets;
+ targets.reserve(_num_targets_per_node);
+ for (size_t i = 0; i < _num_targets_per_node; ++i) {
+ auto target = _target_factory.make_target(connection_spec); // TODO expensive inside lock?
+ assert(target);
+ targets.push_back(std::shared_ptr<RpcTarget>(std::move(target)));
+ }
// TODO emplacement (with replace) semantics to avoid need for default constructed K/V
- _targets[address] = rpc_target;
+ auto pool = std::make_shared<RpcTargetPool>(std::move(targets), connection_spec, curr_slobrok_gen);
+ _targets[address] = pool;
LOG(debug, "Added mapping '%s' -> '%s' at gen %u", address.toString().c_str(),
connection_spec.c_str(), curr_slobrok_gen);
- return rpc_target;
+ return pool->get_target(bucket_id);
}
std::shared_ptr<RpcTarget>
-CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) {
+CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address,
+ uint64_t bucket_id) {
const uint32_t curr_slobrok_gen = _slobrok_mirror.updates();
- if (auto result = lookup_target(address, curr_slobrok_gen)) {
+ if (auto result = lookup_target(address, bucket_id, curr_slobrok_gen)) {
return result;
}
auto slobrok_id = address_to_slobrok_id(address);
@@ -97,10 +110,20 @@ CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& a
assert(specs.size() == 1);
const auto& connection_spec = specs[0].second;
std::unique_lock lock(_targets_rwmutex);
- if (auto result = consider_update_target(address, connection_spec, curr_slobrok_gen, lock)) {
+ if (auto result = consider_update_target_pool(address, bucket_id, connection_spec, curr_slobrok_gen, lock)) {
return result;
}
- return insert_new_target_mapping(address, connection_spec, curr_slobrok_gen, lock);
+ return insert_new_target_mapping(address, bucket_id, connection_spec, curr_slobrok_gen, lock);
+}
+
+std::shared_ptr<RpcTargetPool>
+CachingRpcTargetResolver::resolve_rpc_target_pool(const api::StorageMessageAddress& address) {
+ std::shared_lock lock(_targets_rwmutex);
+ auto itr = _targets.find(address);
+ if (itr != _targets.end()) {
+ return itr->second;
+ }
+ return {};
}
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
index 52b505d5476..1b614488b2d 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
@@ -3,6 +3,7 @@
#include "rpc_target.h"
#include "rpc_target_factory.h"
+#include "rpc_target_pool.h"
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <memory>
@@ -24,7 +25,7 @@ class CachingRpcTargetResolver {
}
};
using TargetHashMap = vespalib::hash_map<api::StorageMessageAddress,
- std::shared_ptr<RpcTarget>,
+ std::shared_ptr<RpcTargetPool>,
AddressInternalHasher>;
using UniqueLock = std::unique_lock<std::shared_mutex>;
@@ -32,27 +33,36 @@ class CachingRpcTargetResolver {
const RpcTargetFactory& _target_factory;
mutable std::shared_mutex _targets_rwmutex;
TargetHashMap _targets; // TODO LRU? Size cap?
+ size_t _num_targets_per_node;
std::shared_ptr<RpcTarget> lookup_target(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
uint32_t curr_slobrok_gen);
- std::shared_ptr<RpcTarget> consider_update_target(const api::StorageMessageAddress& address,
- const vespalib::string& connection_spec,
- uint32_t curr_slobrok_gen,
- const UniqueLock& targets_lock);
+ std::shared_ptr<RpcTarget> consider_update_target_pool(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ const UniqueLock& targets_lock);
std::shared_ptr<RpcTarget> insert_new_target_mapping(const api::StorageMessageAddress& address,
+ uint64_t bucket_id,
const vespalib::string& connection_spec,
uint32_t curr_slobrok_gen,
const UniqueLock& targets_lock);
public:
CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror,
- const RpcTargetFactory& target_factory);
+ const RpcTargetFactory& target_factory,
+ size_t num_targets_per_node);
~CachingRpcTargetResolver();
static vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address);
- std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address);
+ std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address,
+ uint64_t bucket_id);
+
+ // Should only be used for unit testing
+ std::shared_ptr<RpcTargetPool> resolve_rpc_target_pool(const api::StorageMessageAddress& address);
};
} // storage::rpc
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
deleted file mode 100644
index 5fcce1e2a6d..00000000000
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "rpc_target.h"
-
-namespace storage::rpc {
-
-RpcTarget::RpcTarget(std::unique_ptr<WrappedFrtTarget> target, vespalib::stringref spec, uint32_t slobrok_gen)
- : _target(std::move(target)),
- _spec(spec),
- _slobrok_gen(slobrok_gen)
-{}
-
-RpcTarget::~RpcTarget() = default;
-
-}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
index 0f2dec4269c..af93e7101ca 100644
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
@@ -2,7 +2,6 @@
#pragma once
#include <vespa/vespalib/stllike/string.h>
-#include <cstdint>
class FRT_Target;
@@ -11,26 +10,12 @@ namespace storage::rpc {
/**
* Simple wrapper API to access a FRT_Target.
*/
-class WrappedFrtTarget {
+class RpcTarget {
public:
- virtual ~WrappedFrtTarget() = default;
+ virtual ~RpcTarget() = default;
virtual FRT_Target* get() noexcept = 0;
virtual bool is_valid() const noexcept = 0;
-};
-
-struct RpcTarget {
- std::unique_ptr<WrappedFrtTarget> _target;
- const vespalib::string _spec;
- uint32_t _slobrok_gen;
-
- RpcTarget(std::unique_ptr<WrappedFrtTarget> target,
- vespalib::stringref spec,
- uint32_t slobrok_gen);
- RpcTarget(const RpcTarget&) = delete;
- RpcTarget& operator=(const RpcTarget&) = delete;
- RpcTarget(RpcTarget&&) = delete;
- RpcTarget& operator=(RpcTarget&&) = delete;
- ~RpcTarget();
+ virtual const vespalib::string& spec() const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
index ef438e96305..14c356652e1 100644
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
@@ -6,7 +6,7 @@
namespace storage::rpc {
-struct RpcTarget;
+class RpcTarget;
/**
* Factory for creating instances of RpcTarget based on a connection spec.
@@ -14,7 +14,7 @@ struct RpcTarget;
class RpcTargetFactory {
public:
virtual ~RpcTargetFactory() = default;
- virtual std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const = 0;
+ virtual std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec) const = 0;
};
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp
new file mode 100644
index 00000000000..d8671ee3783
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "rpc_target.h"
+#include "rpc_target_pool.h"
+
+namespace storage::rpc {
+
+RpcTargetPool::RpcTargetPool(RpcTargetVector&& targets, const vespalib::string& spec, uint32_t slobrok_gen)
+ : _targets(std::move(targets)),
+ _spec(spec),
+ _slobrok_gen(slobrok_gen)
+{
+}
+
+std::shared_ptr<RpcTarget>
+RpcTargetPool::get_target(uint64_t bucket_id) const
+{
+ return _targets[bucket_id % _targets.size()];
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h
new file mode 100644
index 00000000000..675341e4a1d
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h
@@ -0,0 +1,37 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <cstdint>
+#include <vector>
+
+namespace storage::rpc {
+
+class RpcTarget;
+
+/**
+ * A pool of RPC targets used for a single node endpoint.
+ *
+ * The bucket id associated with a message is used to select the RPC target.
+ * This ensures the same RPC target is used for all messages to the same bucket to the same node,
+ * and the RPC target itself handles sequencing of these messages.
+ */
+class RpcTargetPool {
+public:
+ using RpcTargetVector = std::vector<std::shared_ptr<RpcTarget>>;
+
+private:
+ RpcTargetVector _targets;
+ const vespalib::string _spec;
+ uint32_t _slobrok_gen;
+
+public:
+ RpcTargetPool(RpcTargetVector&& targets, const vespalib::string& spec, uint32_t slobrok_gen);
+ const vespalib::string& spec() const { return _spec; }
+ uint32_t slobrok_gen() const { return _slobrok_gen; }
+ void update_slobrok_gen(uint32_t curr_slobrok_gen) { _slobrok_gen = curr_slobrok_gen; }
+ std::shared_ptr<RpcTarget> get_target(uint64_t bucket_id) const;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index d21f32aa623..8896df7a155 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -23,19 +23,22 @@ namespace storage::rpc {
namespace {
-class WrappedFrtTargetImpl : public WrappedFrtTarget {
+class RpcTargetImpl : public RpcTarget {
private:
FRT_Target* _target;
+ vespalib::string _spec;
public:
- WrappedFrtTargetImpl(FRT_Target* target)
- : _target(target)
+ RpcTargetImpl(FRT_Target* target, const vespalib::string& spec)
+ : _target(target),
+ _spec(spec)
{}
- ~WrappedFrtTargetImpl() override {
+ ~RpcTargetImpl() override {
_target->SubRef();
}
FRT_Target* get() noexcept override { return _target; }
bool is_valid() const noexcept override { return _target->IsValid(); }
+ const vespalib::string& spec() const noexcept override { return _spec; }
};
}
@@ -48,11 +51,10 @@ public:
RpcTargetFactoryImpl(FRT_Supervisor& orb)
: _orb(orb)
{}
- std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override {
+ std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec) const override {
auto* raw_target = _orb.GetTarget(connection_spec.c_str());
if (raw_target) {
- return std::make_unique<RpcTarget>
- (std::make_unique<WrappedFrtTargetImpl>(raw_target), connection_spec, slobrok_gen);
+ return std::make_unique<RpcTargetImpl>(raw_target, connection_spec);
}
return std::unique_ptr<RpcTarget>();
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 8b5c7706510..42b621ea10f 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -35,7 +35,8 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
_rpc_resources(rpc_resources),
_message_codec_provider(message_codec_provider),
_params(params),
- _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory())),
+ _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory(),
+ params.num_rpc_targets_per_node)),
_direct_rpc_supported(true)
{
register_server_methods(rpc_resources);
@@ -43,7 +44,10 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
StorageApiRpcService::~StorageApiRpcService() = default;
-StorageApiRpcService::Params::Params() = default;
+StorageApiRpcService::Params::Params()
+ : compression_config(),
+ num_rpc_targets_per_node(1)
+{}
StorageApiRpcService::Params::~Params() = default;
@@ -216,7 +220,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
cmd->getType().getName().c_str(), cmd->getAddress()->toString().c_str());
assert(cmd->getAddress() != nullptr);
- auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress());
+ auto target = _target_resolver->resolve_rpc_target(*cmd->getAddress(), cmd->getBucketId().getId());
if (!target) {
auto reply = cmd->makeReply();
reply->setResult(make_no_address_for_service_error(*cmd->getAddress()));
@@ -233,7 +237,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds",
_rpc_resources.handle().c_str(),
CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(),
- target->_spec.c_str(), vespalib::to_s(cmd->getTimeout())));
+ target->spec().c_str(), vespalib::to_s(cmd->getTimeout())));
}
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
req->SetMethodName(rpc_v1_method_name());
@@ -251,7 +255,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd));
req->SetContext(FNET_Context(&req_ctx));
- target->_target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
+ target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
}
void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index cb2344ccd13..3ccd235fc43 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -37,6 +37,7 @@ class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait {
public:
struct Params {
vespalib::compression::CompressionConfig compression_config;
+ size_t num_rpc_targets_per_node;
Params();
~Params();