aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-09-11 11:34:44 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-14 13:22:15 +0000
commitfb531dcd1963725b657d0412f1ca8ab97352db08 (patch)
tree9176104547d84c923d1a64812686d3dddeb7af6f /storage
parent06a0f822bc6f90e64e6d8510e1b3f5ef3cc037ab (diff)
Decouple CachingRpcTargetResolver from slobrok and fnet implementations and write unit tests.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/rpc/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp134
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp107
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h33
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp11
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target.h19
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h21
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp44
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h5
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp4
10 files changed, 318 insertions, 61 deletions
diff --git a/storage/src/tests/storageserver/rpc/CMakeLists.txt b/storage/src/tests/storageserver/rpc/CMakeLists.txt
index f240bb3a6c7..a04209f77e8 100644
--- a/storage/src/tests/storageserver/rpc/CMakeLists.txt
+++ b/storage/src/tests/storageserver/rpc/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_executable(storage_storageserver_rpc_gtest_runner_app TEST
SOURCES
+ caching_rpc_target_resolver_test.cpp
cluster_controller_rpc_api_service_test.cpp
message_codec_provider_test.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
new file mode 100644
index 00000000000..bdf0aaee610
--- /dev/null
+++ b/storage/src/tests/storageserver/rpc/caching_rpc_target_resolver_test.cpp
@@ -0,0 +1,134 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/slobrok/imirrorapi.h>
+#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/vdslib/state/nodetype.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace storage::rpc;
+using slobrok::api::IMirrorAPI;
+using storage::api::StorageMessageAddress;
+using storage::lib::NodeType;
+
+class MockMirror : public IMirrorAPI {
+public:
+ using Mappings = std::map<vespalib::string, IMirrorAPI::SpecList>;
+ Mappings mappings;
+ uint32_t gen;
+ MockMirror() : mappings(), gen(1) {}
+ SpecList lookup(const std::string& pattern) const override {
+ auto itr = mappings.find(pattern);
+ if (itr != mappings.end()) {
+ return itr->second;
+ }
+ return {};
+ }
+ uint32_t updates() const override { return gen; }
+ bool ready() const override { return true; }
+ void inc_gen() { ++gen; }
+};
+
+class MockWrappedFrtTarget : public WrappedFrtTarget {
+private:
+ bool& _valid;
+public:
+ MockWrappedFrtTarget(bool& valid) : _valid(valid) {}
+ FRT_Target* get() override { return nullptr; }
+ bool is_valid() const override { return _valid; }
+};
+
+class MockTargetFactory : public RpcTargetFactory {
+public:
+ mutable bool valid_target;
+
+ MockTargetFactory() : valid_target(true) {}
+ std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override {
+ return std::make_unique<RpcTarget>(std::make_unique<MockWrappedFrtTarget>(valid_target),
+ connection_spec, slobrok_gen);
+ }
+};
+
+class CachingRpcTargetResolverTest : public ::testing::Test {
+public:
+ MockMirror mirror;
+ MockTargetFactory factory;
+ CachingRpcTargetResolver resolver;
+ StorageMessageAddress address_0;
+ StorageMessageAddress address_1;
+ vespalib::string spec_0;
+ vespalib::string spec_1;
+
+ CachingRpcTargetResolverTest()
+ : mirror(),
+ factory(),
+ resolver(mirror, factory),
+ address_0("my_cluster", NodeType::STORAGE, 5),
+ address_1("my_cluster", NodeType::DISTRIBUTOR, 7),
+ spec_0("tcp/my:41"),
+ spec_1("tcp/my:42")
+ {
+ add_mapping(address_0, spec_0);
+ }
+ void add_mapping(const StorageMessageAddress& address, const vespalib::string& connection_spec) {
+ mirror.mappings[to_slobrok_id(address)] = {{to_slobrok_id(address), connection_spec}};
+ }
+ vespalib::string to_slobrok_id(const storage::api::StorageMessageAddress& address) const {
+ return CachingRpcTargetResolver::address_to_slobrok_id(address);
+ }
+};
+
+TEST_F(CachingRpcTargetResolverTest, converts_storage_message_address_to_slobrok_id)
+{
+ EXPECT_EQ("storage/cluster.my_cluster/storage/5", to_slobrok_id(address_0));
+ EXPECT_EQ("storage/cluster.my_cluster/distributor/7", to_slobrok_id(address_1));
+}
+
+TEST_F(CachingRpcTargetResolverTest, resolves_rpc_target_and_caches_result)
+{
+ auto target_a = resolver.resolve_rpc_target(address_0);
+ ASSERT_TRUE(target_a);
+ EXPECT_EQ(spec_0, target_a->_spec);
+ EXPECT_EQ(1, target_a->_slobrok_gen);
+ auto target_b = resolver.resolve_rpc_target(address_0);
+ ASSERT_TRUE(target_b);
+ EXPECT_EQ(target_a.get(), target_b.get());
+ EXPECT_EQ(spec_0, target_b->_spec);
+ EXPECT_EQ(1, target_b->_slobrok_gen);
+}
+
+TEST_F(CachingRpcTargetResolverTest, cached_rpc_target_is_updated_when_slobrok_generation_changes)
+{
+ auto target_a = resolver.resolve_rpc_target(address_0);
+ mirror.inc_gen();
+ auto target_b = resolver.resolve_rpc_target(address_0);
+ EXPECT_EQ(target_a.get(), target_b.get());
+ EXPECT_EQ(2, target_b->_slobrok_gen);
+}
+
+TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_connection_spec_changes)
+{
+ auto target_a = resolver.resolve_rpc_target(address_0);
+ add_mapping(address_0, spec_1);
+ mirror.inc_gen();
+ auto target_b = resolver.resolve_rpc_target(address_0);
+ EXPECT_NE(target_a.get(), target_b.get());
+ EXPECT_EQ(spec_1, target_b->_spec);
+ EXPECT_EQ(2, target_b->_slobrok_gen);
+}
+
+TEST_F(CachingRpcTargetResolverTest, new_rpc_target_is_created_if_raw_target_is_invalid)
+{
+ auto target_a = resolver.resolve_rpc_target(address_0);
+ factory.valid_target = false;
+ auto target_b = resolver.resolve_rpc_target(address_0);
+ EXPECT_NE(target_a.get(), target_b.get());
+ EXPECT_EQ(spec_0, target_b->_spec);
+ EXPECT_EQ(1, target_b->_slobrok_gen);
+}
+
+TEST_F(CachingRpcTargetResolverTest, null_rpc_target_is_returned_if_slobrok_id_is_not_found)
+{
+ auto target = resolver.resolve_rpc_target(address_1);
+ EXPECT_FALSE(target);
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
index 7ccd01e2ccc..14c435a8663 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp
@@ -1,28 +1,30 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "caching_rpc_target_resolver.h"
#include "shared_rpc_resources.h"
-#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
-#include <vespa/slobrok/sbmirror.h>
+#include <vespa/slobrok/imirrorapi.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".storage.caching_rpc_target_resolver");
namespace storage::rpc {
-CachingRpcTargetResolver::CachingRpcTargetResolver(SharedRpcResources& rpc_resources)
- : _rpc_resources(rpc_resources)
+CachingRpcTargetResolver::CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror,
+ const RpcTargetFactory& target_factory)
+ : _slobrok_mirror(slobrok_mirror),
+ _target_factory(target_factory),
+ _targets_rwmutex()
{
}
CachingRpcTargetResolver::~CachingRpcTargetResolver() = default;
-namespace {
-
-vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address) {
+vespalib::string
+CachingRpcTargetResolver::address_to_slobrok_id(const api::StorageMessageAddress& address) {
vespalib::asciistream as;
as << "storage/cluster." << address.getCluster()
<< '/' << ((address.getNodeType() == lib::NodeType::STORAGE) ? "storage" : "distributor")
@@ -30,53 +32,76 @@ vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address
return as.str();
}
+std::shared_ptr<RpcTarget>
+CachingRpcTargetResolver::lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen) {
+ std::shared_lock lock(_targets_rwmutex);
+ auto itr = _targets.find(slobrok_id);
+ if ((itr != _targets.end())
+ && itr->second->_target->is_valid()
+ && (itr->second->_slobrok_gen == curr_slobrok_gen)) {
+ return itr->second;
+ }
+ return {};
+}
+
+std::shared_ptr<RpcTarget>
+CachingRpcTargetResolver::consider_update_target(const vespalib::string& slobrok_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ const UniqueLock& targets_lock) {
+ (void) targets_lock;
+ // If address has the same spec as the existing target, just reuse it.
+ auto itr = _targets.find(slobrok_id);
+ if ((itr != _targets.end())
+ && (itr->second->_target->is_valid())
+ && (itr->second->_spec == connection_spec))
+ {
+ LOG(info, "Updating existing mapping '%s' -> '%s' (gen %u) to gen %u",
+ slobrok_id.c_str(), connection_spec.c_str(), itr->second->_slobrok_gen, curr_slobrok_gen);
+ itr->second->_slobrok_gen = curr_slobrok_gen;
+ return itr->second;
+ }
+ return {};
+}
+
+std::shared_ptr<RpcTarget>
+CachingRpcTargetResolver::insert_new_target_mapping(const vespalib::string& slobrok_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ const UniqueLock& targets_lock) {
+ (void) targets_lock;
+ auto target = _target_factory.make_target(connection_spec, curr_slobrok_gen); // TODO expensive inside lock?
+ assert(target);
+ std::shared_ptr<RpcTarget> rpc_target(std::move(target));
+ _targets[slobrok_id] = rpc_target;
+ LOG(info, "Added mapping '%s' -> '%s' at gen %u", slobrok_id.c_str(), connection_spec.c_str(), curr_slobrok_gen);
+ return rpc_target;
}
-// TODO ensure this is robust and performant wrt. visitor clients constantly bumping
-// slobrok generations by registering new sessions all the time.
std::shared_ptr<RpcTarget>
CachingRpcTargetResolver::resolve_rpc_target(const api::StorageMessageAddress& address) {
// TODO or map directly from address to target instead of going via stringification? Needs hashing, if so.
- auto sb_id = address_to_slobrok_id(address);
- const uint32_t current_sb_gen = _rpc_resources.slobrok_mirror().updates();
- {
- std::shared_lock lock(_targets_rwmutex);
- auto target_iter = _targets.find(sb_id);
- if ((target_iter != _targets.end())
- && target_iter->second->_target->IsValid()
- && (target_iter->second->_sb_generation == current_sb_gen))
- {
- return target_iter->second;
- }
+ auto slobrok_id = address_to_slobrok_id(address);
+ const uint32_t curr_slobrok_gen = _slobrok_mirror.updates();
+ if (auto result = lookup_target(slobrok_id, curr_slobrok_gen)) {
+ return result;
}
- auto specs = _rpc_resources.slobrok_mirror().lookup(sb_id); // FIXME string type mismatch; implicit conv!
+ auto specs = _slobrok_mirror.lookup(slobrok_id); // FIXME string type mismatch; implicit conv!
if (specs.empty()) {
- LOG(info, "Found no mapping for %s", sb_id.c_str());
+ // TODO: Replace all info logging with debug logging.
+ LOG(info, "Found no mapping for '%s'", slobrok_id.c_str());
// TODO return potentially stale existing target if no longer existing in SB?
// TODO or clear any existing mapping?
return {};
}
- const auto& candidate_spec = specs[0].second; // Always use first spec in list. TODO correct?
+ // Note: We don't use wildcards so there is a 1-to-1 mapping between service name / slobrok id and connection spec.
+ assert(specs.size() == 1);
+ const auto& connection_spec = specs[0].second;
std::unique_lock lock(_targets_rwmutex);
- // If address has the same spec as the existing target, just reuse it.
- auto target_iter = _targets.find(sb_id);
- if ((target_iter != _targets.end())
- && (target_iter->second->_target->IsValid())
- && (target_iter->second->_spec == candidate_spec))
- {
- LOG(info, "Updating existing mapping %s -> %s (gen %u) to gen %u",
- sb_id.c_str(), candidate_spec.c_str(), target_iter->second->_sb_generation, current_sb_gen);
- target_iter->second->_sb_generation = current_sb_gen;
- return target_iter->second;
+ if (auto result = consider_update_target(slobrok_id, connection_spec, curr_slobrok_gen, lock)) {
+ return result;
}
- // Insert new mapping or update the old one.
- auto* raw_target = _rpc_resources.supervisor().GetTarget(candidate_spec.c_str()); // TODO expensive inside lock?
- assert(raw_target);
- auto rpc_target = std::make_shared<RpcTarget>(raw_target, candidate_spec, current_sb_gen);
- _targets[sb_id] = rpc_target;
- LOG(info, "Added mapping %s -> %s at gen %u", sb_id.c_str(), candidate_spec.c_str(), current_sb_gen);
- return rpc_target;
+ return insert_new_target_mapping(slobrok_id, connection_spec, curr_slobrok_gen, lock);
}
-
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
index ac91019806b..e9a2aece4c5 100644
--- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
+++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h
@@ -2,29 +2,50 @@
#pragma once
#include "rpc_target.h"
+#include "rpc_target_factory.h"
#include <vespa/vespalib/stllike/hash_map.h>
#include <memory>
#include <shared_mutex>
+namespace slobrok::api { class IMirrorAPI; }
+
namespace storage {
namespace api { class StorageMessageAddress; }
namespace rpc {
-class SharedRpcResources;
-
+/**
+ * Class that resolves and caches rpc targets based on StorageMessageAddress that is mapped to slobrok id,
+ * with lookup in a slobrok mirror.
+ */
class CachingRpcTargetResolver {
- SharedRpcResources& _rpc_resources;
+private:
+ const slobrok::api::IMirrorAPI& _slobrok_mirror;
+ const RpcTargetFactory& _target_factory;
+ using UniqueLock = std::unique_lock<std::shared_mutex>;
mutable std::shared_mutex _targets_rwmutex;
// TODO LRU? Size cap?
vespalib::hash_map<vespalib::string, std::shared_ptr<RpcTarget>> _targets;
+
+ std::shared_ptr<RpcTarget> lookup_target(const vespalib::string& slobrok_id, uint32_t curr_slobrok_gen);
+ std::shared_ptr<RpcTarget> consider_update_target(const vespalib::string& slobrok_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ const UniqueLock& targets_lock);
+
+ std::shared_ptr<RpcTarget> insert_new_target_mapping(const vespalib::string& slobrok_id,
+ const vespalib::string& connection_spec,
+ uint32_t curr_slobrok_gen,
+ const UniqueLock& targets_lock);
+
public:
- // TODO pass explicit slobrok mirror interface and supervisor to make testing easier
- // TODO consider wrapping supervisor to make testing easier
- explicit CachingRpcTargetResolver(SharedRpcResources& rpc_resources);
+ explicit CachingRpcTargetResolver(const slobrok::api::IMirrorAPI& slobrok_mirror,
+ const RpcTargetFactory& target_factory);
~CachingRpcTargetResolver();
+ static vespalib::string address_to_slobrok_id(const api::StorageMessageAddress& address);
+
std::shared_ptr<RpcTarget> resolve_rpc_target(const api::StorageMessageAddress& address);
};
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
index ddf881d3bd3..5fcce1e2a6d 100644
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.cpp
@@ -1,17 +1,14 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpc_target.h"
-#include <vespa/fnet/frt/target.h>
namespace storage::rpc {
-RpcTarget::RpcTarget(FRT_Target* target, vespalib::stringref spec, uint32_t sb_generation)
- : _target(target),
+RpcTarget::RpcTarget(std::unique_ptr<WrappedFrtTarget> target, vespalib::stringref spec, uint32_t slobrok_gen)
+ : _target(std::move(target)),
_spec(spec),
- _sb_generation(sb_generation)
+ _slobrok_gen(slobrok_gen)
{}
-RpcTarget::~RpcTarget() {
- _target->SubRef();
-}
+RpcTarget::~RpcTarget() = default;
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
index 7fd1ddeafaf..52356454685 100644
--- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h
@@ -8,15 +8,24 @@ class FRT_Target;
namespace storage::rpc {
+/**
+ * Simple wrapper API to access a FRT_Target.
+ */
+class WrappedFrtTarget {
+public:
+ virtual ~WrappedFrtTarget() = default;
+ virtual FRT_Target* get() = 0;
+ virtual bool is_valid() const = 0;
+};
+
struct RpcTarget {
- FRT_Target* _target;
+ std::unique_ptr<WrappedFrtTarget> _target;
const vespalib::string _spec;
- uint32_t _sb_generation;
+ uint32_t _slobrok_gen;
- // Target must have ref count of at least 1
- RpcTarget(FRT_Target* target,
+ RpcTarget(std::unique_ptr<WrappedFrtTarget> target,
vespalib::stringref spec,
- uint32_t sb_generation);
+ uint32_t slobrok_gen);
RpcTarget(const RpcTarget&) = delete;
RpcTarget& operator=(const RpcTarget&) = delete;
RpcTarget(RpcTarget&&) = delete;
diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
new file mode 100644
index 00000000000..20744d94a1f
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <memory>
+
+namespace storage::rpc {
+
+class RpcTarget;
+
+/**
+ * Factory for creating instances of RpcTarget based on a connection spec.
+ */
+class RpcTargetFactory {
+public:
+ virtual ~RpcTargetFactory() {}
+ virtual std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const = 0;
+};
+
+}
+
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 4ce1732c6f8..03575987560 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -1,7 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "rpc_target.h"
#include "shared_rpc_resources.h"
#include <vespa/fastos/thread.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/transport.h>
#include <vespa/slobrok/sbregister.h>
#include <vespa/slobrok/sbmirror.h>
@@ -18,6 +20,43 @@ using namespace std::chrono_literals;
namespace storage::rpc {
+namespace {
+
+class WrappedFrtTargetImpl : public WrappedFrtTarget {
+private:
+ FRT_Target* _target;
+
+public:
+ WrappedFrtTargetImpl(FRT_Target* target)
+ : _target(target)
+ {}
+ ~WrappedFrtTargetImpl() override {
+ _target->SubRef();
+ }
+ FRT_Target* get() override { return _target; }
+ bool is_valid() const override { return _target->IsValid(); }
+};
+
+}
+
+class SharedRpcResources::RpcTargetFactoryImpl : public RpcTargetFactory {
+private:
+ FRT_Supervisor& _orb;
+
+public:
+ RpcTargetFactoryImpl(FRT_Supervisor& orb)
+ : _orb(orb)
+ {}
+ std::unique_ptr<RpcTarget> make_target(const vespalib::string& connection_spec, uint32_t slobrok_gen) const override {
+ auto* raw_target = _orb.GetTarget(connection_spec.c_str());
+ if (raw_target) {
+ return std::make_unique<RpcTarget>
+ (std::make_unique<WrappedFrtTargetImpl>(raw_target), connection_spec, slobrok_gen);
+ }
+ return std::unique_ptr<RpcTarget>();
+ }
+};
+
SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
int rpc_server_port,
size_t rpc_thread_pool_size)
@@ -26,6 +65,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri,
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, config_uri)),
_slobrok_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, config_uri)),
+ _target_factory(std::make_unique<RpcTargetFactoryImpl>(*_orb)),
_rpc_server_port(rpc_server_port),
_shutdown(false)
{
@@ -71,4 +111,8 @@ int SharedRpcResources::listen_port() const noexcept {
return _orb->GetListenPort();
}
+const RpcTargetFactory& SharedRpcResources::target_factory() const {
+ return *_target_factory;
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index 9b2f1c04249..bd682612d20 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "rpc_target_factory.h"
#include <vespa/config/subscription/configuri.h>
#include <vespa/vespalib/stllike/string.h>
#include <memory>
@@ -17,11 +18,13 @@ class MirrorAPI;
namespace storage::rpc {
class SharedRpcResources {
+ class RpcTargetFactoryImpl;
std::unique_ptr<FastOS_ThreadPool> _thread_pool;
std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _orb;
std::unique_ptr<slobrok::api::RegisterAPI> _slobrok_register;
std::unique_ptr<slobrok::api::MirrorAPI> _slobrok_mirror;
+ std::unique_ptr<RpcTargetFactoryImpl> _target_factory;
vespalib::string _handle;
int _rpc_server_port;
bool _shutdown;
@@ -41,6 +44,8 @@ public:
void shutdown();
[[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started
+
+ const RpcTargetFactory& target_factory() const;
private:
void wait_until_slobrok_is_ready();
};
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 8bce6210e55..2a4ff2512bf 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -28,7 +28,7 @@ StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher
: _message_dispatcher(message_dispatcher),
_rpc_resources(rpc_resources),
_message_codec_provider(message_codec_provider),
- _target_resolver(std::make_unique<CachingRpcTargetResolver>(rpc_resources))
+ _target_resolver(std::make_unique<CachingRpcTargetResolver>(_rpc_resources.slobrok_mirror(), _rpc_resources.target_factory()))
{
register_server_methods(rpc_resources);
}
@@ -212,7 +212,7 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
auto& req_ctx = req->getStash().create<RpcRequestContext>(std::move(cmd), timeout);
req->SetContext(FNET_Context(&req_ctx));
- target->_target->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
+ target->_target->get()->InvokeAsync(req.release(), vespalib::to_s(timeout), this);
}
namespace {