aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-10 15:16:42 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-14 13:22:15 +0000
commit06a0f822bc6f90e64e6d8510e1b3f5ef3cc037ab (patch)
tree56aaac5105874d8b1dcf3d2f7b765866f7163ff2 /storage/src
parentd4cc122432a835a18588c053d3f3f2043b92b831 (diff)
Add thread-safe encapsulation of protocol codec and live dependency updates
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/storageserver/rpc/CMakeLists.txt1
-rw-r--r--storage/src/tests/storageserver/rpc/message_codec_provider_test.cpp46
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp21
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp38
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h47
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp25
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h18
9 files changed, 161 insertions, 38 deletions
diff --git a/storage/src/tests/storageserver/rpc/CMakeLists.txt b/storage/src/tests/storageserver/rpc/CMakeLists.txt
index ae7128b7484..f240bb3a6c7 100644
--- a/storage/src/tests/storageserver/rpc/CMakeLists.txt
+++ b/storage/src/tests/storageserver/rpc/CMakeLists.txt
@@ -2,6 +2,7 @@
vespa_add_executable(storage_storageserver_rpc_gtest_runner_app TEST
SOURCES
cluster_controller_rpc_api_service_test.cpp
+ message_codec_provider_test.cpp
gtest_runner.cpp
DEPENDS
storage_storageserver
diff --git a/storage/src/tests/storageserver/rpc/message_codec_provider_test.cpp b/storage/src/tests/storageserver/rpc/message_codec_provider_test.cpp
new file mode 100644
index 00000000000..0d4e3b8df93
--- /dev/null
+++ b/storage/src/tests/storageserver/rpc/message_codec_provider_test.cpp
@@ -0,0 +1,46 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
+#include <vespa/storageapi/mbusprot/protocolserialization7.h>
+#include <vespa/document/base/testdocman.h>
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::rpc {
+
+struct MessageCodecProviderTest : Test {
+ std::shared_ptr<const document::DocumentTypeRepo> _repo1;
+ std::shared_ptr<const document::DocumentTypeRepo> _repo2;
+ std::shared_ptr<const documentapi::LoadTypeSet> _load_types1;
+ std::shared_ptr<const documentapi::LoadTypeSet> _load_types2;
+ MessageCodecProvider _provider;
+
+ // We don't care about repo/set contents, just their pointer identities
+ MessageCodecProviderTest()
+ : _repo1(document::TestDocRepo().getTypeRepoSp()),
+ _repo2(document::TestDocRepo().getTypeRepoSp()),
+ _load_types1(std::make_shared<documentapi::LoadTypeSet>()),
+ _load_types2(std::make_shared<documentapi::LoadTypeSet>()),
+ _provider(_repo1, _load_types1)
+ {}
+ ~MessageCodecProviderTest() override;
+};
+
+MessageCodecProviderTest::~MessageCodecProviderTest() = default;
+
+TEST_F(MessageCodecProviderTest, initially_provides_constructed_repos) {
+ auto wrapped = _provider.wrapped_codec();
+ EXPECT_EQ(&wrapped->codec().type_repo(), _repo1.get());
+ EXPECT_EQ(&wrapped->codec().load_type_set(), _load_types1.get());
+}
+
+TEST_F(MessageCodecProviderTest, updated_repos_reflected_in_new_wrapped_codec) {
+ _provider.update_atomically(_repo2, _load_types2);
+
+ auto wrapped = _provider.wrapped_codec();
+ EXPECT_EQ(&wrapped->codec().type_repo(), _repo2.get());
+ EXPECT_EQ(&wrapped->codec().load_type_set(), _load_types2.get());
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 2fd82a40472..397623dc491 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -1,10 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "communicationmanager.h"
-#include "vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h"
-#include "vespa/storage/storageserver/rpc/storage_api_rpc_service.h"
#include "rpcrequestwrapper.h"
-#include "vespa/storage/storageserver/rpc/shared_rpc_resources.h"
#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/network/rpcnetworkparams.h>
@@ -13,6 +10,10 @@
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/storageserver/configurable_bucket_resolver.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h>
+#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
+#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -419,15 +420,14 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
configureMessageBusLimits(*config);
}
- // TODO temporary!
- auto repo_getter = [this]{ return _component.getTypeRepo()->documentTypeRepo; };
- auto loadtype_getter = [this]{ return _component.getLoadTypes(); };
-
_use_direct_storageapi_rpc = config->useDirectStorageapiRpc;
+ _message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo,
+ _component.getLoadTypes());
// TODO configurable thread pool size
_shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, 1/*pool size*/);
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
- _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(*this, *_shared_rpc_resources, std::move(repo_getter), std::move(loadtype_getter));
+ _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(
+ *this, *_shared_rpc_resources, *_message_codec_provider);
if (_mbus) {
mbus::DestinationSessionParams dstParams;
@@ -780,7 +780,7 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string&
}
void CommunicationManager::updateMessagebusProtocol(
- const std::shared_ptr<const document::DocumentTypeRepo> &repo) {
+ const std::shared_ptr<const document::DocumentTypeRepo>& repo) {
if (_mbus) {
framework::SecondTime now(_component.getClock().getTimeInSeconds());
auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(*_component.getLoadTypes(), repo);
@@ -789,6 +789,9 @@ void CommunicationManager::updateMessagebusProtocol(
auto newStorageProtocol = std::make_shared<mbusprot::StorageProtocol>(repo, *_component.getLoadTypes());
_earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newStorageProtocol)));
}
+ if (_message_codec_provider) {
+ _message_codec_provider->update_atomically(repo, _component.getLoadTypes());
+ }
}
void CommunicationManager::updateBucketSpacesConfig(const BucketspacesConfig& config) {
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 1a3d2081c47..7ac9d575ee6 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -39,6 +39,7 @@ namespace storage {
namespace rpc {
class ClusterControllerApiRpcService;
+class MessageCodecProvider;
class SharedRpcResources;
class StorageApiRpcService;
}
@@ -79,6 +80,7 @@ private:
std::unique_ptr<rpc::SharedRpcResources> _shared_rpc_resources;
std::unique_ptr<rpc::StorageApiRpcService> _storage_api_rpc_service;
std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service;
+ std::unique_ptr<rpc::MessageCodecProvider> _message_codec_provider;
Queue _eventQueue;
// XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
std::unique_ptr<config::ConfigFetcher> _configFetcher;
diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
index f42656068f6..be0e83e89cc 100644
--- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
+++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt
@@ -15,6 +15,7 @@ vespa_add_library(storage_storageserver_rpc OBJECT
SOURCES
caching_rpc_target_resolver.cpp
cluster_controller_api_rpc_service.cpp
+ message_codec_provider.cpp
rpc_target.cpp
shared_rpc_resources.cpp
slime_cluster_state_bundle_codec.cpp
diff --git a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp
new file mode 100644
index 00000000000..90ea4291a30
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp
@@ -0,0 +1,38 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "message_codec_provider.h"
+#include <vespa/storageapi/mbusprot/protocolserialization7.h>
+
+namespace storage::rpc {
+
+WrappedCodec::WrappedCodec(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo,
+ std::shared_ptr<const documentapi::LoadTypeSet> load_type_set)
+ : _doc_type_repo(std::move(doc_type_repo)),
+ _load_type_set(std::move(load_type_set)),
+ _codec(std::make_unique<mbusprot::ProtocolSerialization7>(_doc_type_repo, *_load_type_set))
+{
+}
+
+WrappedCodec::~WrappedCodec() = default;
+
+MessageCodecProvider::MessageCodecProvider(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo,
+ std::shared_ptr<const documentapi::LoadTypeSet> load_type_set)
+ : _rw_mutex(),
+ _active_codec(std::make_shared<WrappedCodec>(std::move(doc_type_repo), std::move(load_type_set)))
+{
+}
+
+MessageCodecProvider::~MessageCodecProvider() = default;
+
+std::shared_ptr<const WrappedCodec> MessageCodecProvider::wrapped_codec() const noexcept {
+ std::shared_lock r_lock(_rw_mutex);
+ return _active_codec;
+}
+
+void MessageCodecProvider::update_atomically(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo,
+ std::shared_ptr<const documentapi::LoadTypeSet> load_type_set)
+{
+ std::unique_lock w_lock(_rw_mutex);
+ _active_codec = std::make_shared<WrappedCodec>(std::move(doc_type_repo), std::move(load_type_set));
+}
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h
new file mode 100644
index 00000000000..fdadfd6f910
--- /dev/null
+++ b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h
@@ -0,0 +1,47 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <memory>
+#include <shared_mutex>
+
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class LoadTypeSet; }
+namespace storage::mbusprot { class ProtocolSerialization7; }
+
+namespace storage::rpc {
+
+class WrappedCodec {
+ const std::shared_ptr<const document::DocumentTypeRepo> _doc_type_repo;
+ const std::shared_ptr<const documentapi::LoadTypeSet> _load_type_set;
+ std::unique_ptr<mbusprot::ProtocolSerialization7> _codec;
+public:
+ WrappedCodec(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo,
+ std::shared_ptr<const documentapi::LoadTypeSet> load_type_set);
+ ~WrappedCodec();
+
+ [[nodiscard]] const mbusprot::ProtocolSerialization7& codec() const noexcept { return *_codec; }
+};
+
+/**
+ * Thread-safe wrapper around a protocol serialization codec and its transitive
+ * dependencies. Effectively provides support for setting and getting an immutable
+ * codec snapshot that can be used for RPC (de-)serialization.
+ */
+class MessageCodecProvider {
+ // TODO replace with std::atomic<std::shared_ptr<WrappedCodec>> once on a sufficiently new
+ // C++20 STL that implements the P0718R2 proposal. We expect(tm) an implementation to use
+ // lock-free compiler-specific 128-bit CAS atomics instead of explicit locks there.
+ mutable std::shared_mutex _rw_mutex;
+ std::shared_ptr<WrappedCodec> _active_codec;
+public:
+ MessageCodecProvider(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo,
+ std::shared_ptr<const documentapi::LoadTypeSet> load_type_set);
+ ~MessageCodecProvider();
+
+ [[nodiscard]] std::shared_ptr<const WrappedCodec> wrapped_codec() const noexcept;
+
+ void update_atomically(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo,
+ std::shared_ptr<const documentapi::LoadTypeSet> load_type_set);
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index 43d366062b9..8bce6210e55 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "storage_api_rpc_service.h"
#include "caching_rpc_target_resolver.h"
+#include "message_codec_provider.h"
#include "shared_rpc_resources.h"
#include "rpc_envelope.pb.h"
#include <vespa/fnet/frt/supervisor.h>
@@ -23,13 +24,10 @@ namespace storage::rpc {
StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher,
SharedRpcResources& rpc_resources,
- std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func,
- std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func)
+ MessageCodecProvider& message_codec_provider)
: _message_dispatcher(message_dispatcher),
_rpc_resources(rpc_resources),
- // TODO these are temporary, need to be consolidated and moved out
- _doctype_repo_func(std::move(doctype_repo_func)),
- _loadtype_set_func(std::move(loadtype_set_func)),
+ _message_codec_provider(message_codec_provider),
_target_resolver(std::make_unique<CachingRpcTargetResolver>(rpc_resources))
{
register_server_methods(rpc_resources);
@@ -117,12 +115,8 @@ void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload, FRT_Values& p
template <typename MessageType>
void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params) {
- // Shared ptrs must stay alive
- // TODO move out, cache
- auto repo = _doctype_repo_func();
- auto load_types = _loadtype_set_func();
- mbusprot::ProtocolSerialization7 codec(repo, *load_types);
- auto payload = codec.encode(msg);
+ auto wrapped_codec = _message_codec_provider.wrapped_codec();
+ auto payload = wrapped_codec->codec().encode(msg);
compress_and_add_payload_to_rpc_params(payload, params);
}
@@ -140,14 +134,9 @@ void StorageApiRpcService::uncompress_rpc_payload(
decompress(compression_type, uncompressed_length, blob, uncompressed, true);
assert(uncompressed_length == uncompressed.getDataLen());
assert(uncompressed_length <= UINT32_MAX);
+ auto wrapped_codec = _message_codec_provider.wrapped_codec();
- // Shared ptrs must stay alive
- // TODO move out, cache
- auto repo = _doctype_repo_func();
- auto loadtypes = _loadtype_set_func();
- mbusprot::ProtocolSerialization7 codec(repo, *loadtypes);
-
- payload_callback(codec, mbus::BlobRef(uncompressed.getData(), uncompressed_length));
+ payload_callback(wrapped_codec->codec(), mbus::BlobRef(uncompressed.getData(), uncompressed_length));
}
void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index a1bb4008cbb..7ffa111fa60 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -6,7 +6,6 @@
#include <vespa/fnet/frt/invoker.h>
#include <vespa/vespalib/stllike/string.h>
#include <atomic>
-#include <functional>
#include <memory>
class FRT_RPCRequest;
@@ -28,18 +27,19 @@ class StorageReply;
namespace rpc {
-class SharedRpcResources;
class CachingRpcTargetResolver;
+class MessageCodecProvider;
+class SharedRpcResources;
class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait {
- MessageDispatcher& _message_dispatcher;
- SharedRpcResources& _rpc_resources;
+ MessageDispatcher& _message_dispatcher;
+ SharedRpcResources& _rpc_resources;
+ MessageCodecProvider& _message_codec_provider;
+ std::unique_ptr<CachingRpcTargetResolver> _target_resolver;
public:
StorageApiRpcService(MessageDispatcher& message_dispatcher,
SharedRpcResources& rpc_resources,
- // TODO temporary!
- std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func,
- std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func);
+ MessageCodecProvider& message_codec_provider);
~StorageApiRpcService() override;
void RPC_rpc_v1_send(FRT_RPCRequest* req);
@@ -59,10 +59,6 @@ private:
{}
};
- std::function<std::shared_ptr<const document::DocumentTypeRepo>()> _doctype_repo_func;
- std::function<std::shared_ptr<documentapi::LoadTypeSet>()> _loadtype_set_func;
- std::unique_ptr<CachingRpcTargetResolver> _target_resolver;
-
void register_server_methods(SharedRpcResources&);
template <typename PayloadCodecCallback>
void uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback);