diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-10 15:16:42 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-14 13:22:15 +0000 |
commit | 06a0f822bc6f90e64e6d8510e1b3f5ef3cc037ab (patch) | |
tree | 56aaac5105874d8b1dcf3d2f7b765866f7163ff2 /storage | |
parent | d4cc122432a835a18588c053d3f3f2043b92b831 (diff) |
Add thread-safe encapsulation of protocol codec and live dependency updates
Diffstat (limited to 'storage')
9 files changed, 161 insertions, 38 deletions
diff --git a/storage/src/tests/storageserver/rpc/CMakeLists.txt b/storage/src/tests/storageserver/rpc/CMakeLists.txt index ae7128b7484..f240bb3a6c7 100644 --- a/storage/src/tests/storageserver/rpc/CMakeLists.txt +++ b/storage/src/tests/storageserver/rpc/CMakeLists.txt @@ -2,6 +2,7 @@ vespa_add_executable(storage_storageserver_rpc_gtest_runner_app TEST SOURCES cluster_controller_rpc_api_service_test.cpp + message_codec_provider_test.cpp gtest_runner.cpp DEPENDS storage_storageserver diff --git a/storage/src/tests/storageserver/rpc/message_codec_provider_test.cpp b/storage/src/tests/storageserver/rpc/message_codec_provider_test.cpp new file mode 100644 index 00000000000..0d4e3b8df93 --- /dev/null +++ b/storage/src/tests/storageserver/rpc/message_codec_provider_test.cpp @@ -0,0 +1,46 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/storage/storageserver/rpc/message_codec_provider.h> +#include <vespa/storageapi/mbusprot/protocolserialization7.h> +#include <vespa/document/base/testdocman.h> +#include <vespa/documentapi/loadtypes/loadtypeset.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace ::testing; + +namespace storage::rpc { + +struct MessageCodecProviderTest : Test { + std::shared_ptr<const document::DocumentTypeRepo> _repo1; + std::shared_ptr<const document::DocumentTypeRepo> _repo2; + std::shared_ptr<const documentapi::LoadTypeSet> _load_types1; + std::shared_ptr<const documentapi::LoadTypeSet> _load_types2; + MessageCodecProvider _provider; + + // We don't care about repo/set contents, just their pointer identities + MessageCodecProviderTest() + : _repo1(document::TestDocRepo().getTypeRepoSp()), + _repo2(document::TestDocRepo().getTypeRepoSp()), + _load_types1(std::make_shared<documentapi::LoadTypeSet>()), + _load_types2(std::make_shared<documentapi::LoadTypeSet>()), + _provider(_repo1, _load_types1) + {} + ~MessageCodecProviderTest() override; +}; + +MessageCodecProviderTest::~MessageCodecProviderTest() = default; + +TEST_F(MessageCodecProviderTest, initially_provides_constructed_repos) { + auto wrapped = _provider.wrapped_codec(); + EXPECT_EQ(&wrapped->codec().type_repo(), _repo1.get()); + EXPECT_EQ(&wrapped->codec().load_type_set(), _load_types1.get()); +} + +TEST_F(MessageCodecProviderTest, updated_repos_reflected_in_new_wrapped_codec) { + _provider.update_atomically(_repo2, _load_types2); + + auto wrapped = _provider.wrapped_codec(); + EXPECT_EQ(&wrapped->codec().type_repo(), _repo2.get()); + EXPECT_EQ(&wrapped->codec().load_type_set(), _load_types2.get()); +} + +} diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 2fd82a40472..397623dc491 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -1,10 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "communicationmanager.h" -#include "vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h" -#include "vespa/storage/storageserver/rpc/storage_api_rpc_service.h" #include "rpcrequestwrapper.h" -#include "vespa/storage/storageserver/rpc/shared_rpc_resources.h" #include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/network/rpcnetworkparams.h> @@ -13,6 +10,10 @@ #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageserver/configurable_bucket_resolver.h> +#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h> +#include <vespa/storage/storageserver/rpc/message_codec_provider.h> +#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> @@ -419,15 +420,14 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> configureMessageBusLimits(*config); } - // TODO temporary! - auto repo_getter = [this]{ return _component.getTypeRepo()->documentTypeRepo; }; - auto loadtype_getter = [this]{ return _component.getLoadTypes(); }; - _use_direct_storageapi_rpc = config->useDirectStorageapiRpc; + _message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo, + _component.getLoadTypes()); // TODO configurable thread pool size _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, 1/*pool size*/); _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); - _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(*this, *_shared_rpc_resources, std::move(repo_getter), std::move(loadtype_getter)); + _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>( + *this, *_shared_rpc_resources, *_message_codec_provider); if (_mbus) { mbus::DestinationSessionParams dstParams; @@ -780,7 +780,7 @@ CommunicationManager::print(std::ostream& out, bool verbose, const std::string& } void CommunicationManager::updateMessagebusProtocol( - const std::shared_ptr<const document::DocumentTypeRepo> &repo) { + const std::shared_ptr<const document::DocumentTypeRepo>& repo) { if (_mbus) { framework::SecondTime now(_component.getClock().getTimeInSeconds()); auto newDocumentProtocol = std::make_shared<documentapi::DocumentProtocol>(*_component.getLoadTypes(), repo); @@ -789,6 +789,9 @@ void CommunicationManager::updateMessagebusProtocol( auto newStorageProtocol = std::make_shared<mbusprot::StorageProtocol>(repo, *_component.getLoadTypes()); _earlierGenerations.push_back(std::make_pair(now, _mbus->getMessageBus().putProtocol(newStorageProtocol))); } + if (_message_codec_provider) { + _message_codec_provider->update_atomically(repo, _component.getLoadTypes()); + } } void CommunicationManager::updateBucketSpacesConfig(const BucketspacesConfig& config) { diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 1a3d2081c47..7ac9d575ee6 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -39,6 +39,7 @@ namespace storage { namespace rpc { class ClusterControllerApiRpcService; +class MessageCodecProvider; class SharedRpcResources; class StorageApiRpcService; } @@ -79,6 +80,7 @@ private: std::unique_ptr<rpc::SharedRpcResources> _shared_rpc_resources; std::unique_ptr<rpc::StorageApiRpcService> _storage_api_rpc_service; std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service; + std::unique_ptr<rpc::MessageCodecProvider> _message_codec_provider; Queue _eventQueue; // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? std::unique_ptr<config::ConfigFetcher> _configFetcher; diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt index f42656068f6..be0e83e89cc 100644 --- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt @@ -15,6 +15,7 @@ vespa_add_library(storage_storageserver_rpc OBJECT SOURCES caching_rpc_target_resolver.cpp cluster_controller_api_rpc_service.cpp + message_codec_provider.cpp rpc_target.cpp shared_rpc_resources.cpp slime_cluster_state_bundle_codec.cpp diff --git a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp new file mode 100644 index 00000000000..90ea4291a30 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp @@ -0,0 +1,38 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "message_codec_provider.h" +#include <vespa/storageapi/mbusprot/protocolserialization7.h> + +namespace storage::rpc { + +WrappedCodec::WrappedCodec(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo, + std::shared_ptr<const documentapi::LoadTypeSet> load_type_set) + : _doc_type_repo(std::move(doc_type_repo)), + _load_type_set(std::move(load_type_set)), + _codec(std::make_unique<mbusprot::ProtocolSerialization7>(_doc_type_repo, *_load_type_set)) +{ +} + +WrappedCodec::~WrappedCodec() = default; + +MessageCodecProvider::MessageCodecProvider(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo, + std::shared_ptr<const documentapi::LoadTypeSet> load_type_set) + : _rw_mutex(), + _active_codec(std::make_shared<WrappedCodec>(std::move(doc_type_repo), std::move(load_type_set))) +{ +} + +MessageCodecProvider::~MessageCodecProvider() = default; + +std::shared_ptr<const WrappedCodec> MessageCodecProvider::wrapped_codec() const noexcept { + std::shared_lock r_lock(_rw_mutex); + return _active_codec; +} + +void MessageCodecProvider::update_atomically(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo, + std::shared_ptr<const documentapi::LoadTypeSet> load_type_set) +{ + std::unique_lock w_lock(_rw_mutex); + _active_codec = std::make_shared<WrappedCodec>(std::move(doc_type_repo), std::move(load_type_set)); +} + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h new file mode 100644 index 00000000000..fdadfd6f910 --- /dev/null +++ b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h @@ -0,0 +1,47 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <memory> +#include <shared_mutex> + +namespace document { class DocumentTypeRepo; } +namespace documentapi { class LoadTypeSet; } +namespace storage::mbusprot { class ProtocolSerialization7; } + +namespace storage::rpc { + +class WrappedCodec { + const std::shared_ptr<const document::DocumentTypeRepo> _doc_type_repo; + const std::shared_ptr<const documentapi::LoadTypeSet> _load_type_set; + std::unique_ptr<mbusprot::ProtocolSerialization7> _codec; +public: + WrappedCodec(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo, + std::shared_ptr<const documentapi::LoadTypeSet> load_type_set); + ~WrappedCodec(); + + [[nodiscard]] const mbusprot::ProtocolSerialization7& codec() const noexcept { return *_codec; } +}; + +/** + * Thread-safe wrapper around a protocol serialization codec and its transitive + * dependencies. Effectively provides support for setting and getting an immutable + * codec snapshot that can be used for RPC (de-)serialization. + */ +class MessageCodecProvider { + // TODO replace with std::atomic<std::shared_ptr<WrappedCodec>> once on a sufficiently new + // C++20 STL that implements the P0718R2 proposal. We expect(tm) an implementation to use + // lock-free compiler-specific 128-bit CAS atomics instead of explicit locks there. + mutable std::shared_mutex _rw_mutex; + std::shared_ptr<WrappedCodec> _active_codec; +public: + MessageCodecProvider(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo, + std::shared_ptr<const documentapi::LoadTypeSet> load_type_set); + ~MessageCodecProvider(); + + [[nodiscard]] std::shared_ptr<const WrappedCodec> wrapped_codec() const noexcept; + + void update_atomically(std::shared_ptr<const document::DocumentTypeRepo> doc_type_repo, + std::shared_ptr<const documentapi::LoadTypeSet> load_type_set); +}; + +} diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index 43d366062b9..8bce6210e55 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storage_api_rpc_service.h" #include "caching_rpc_target_resolver.h" +#include "message_codec_provider.h" #include "shared_rpc_resources.h" #include "rpc_envelope.pb.h" #include <vespa/fnet/frt/supervisor.h> @@ -23,13 +24,10 @@ namespace storage::rpc { StorageApiRpcService::StorageApiRpcService(MessageDispatcher& message_dispatcher, SharedRpcResources& rpc_resources, - std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func, - std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func) + MessageCodecProvider& message_codec_provider) : _message_dispatcher(message_dispatcher), _rpc_resources(rpc_resources), - // TODO these are temporary, need to be consolidated and moved out - _doctype_repo_func(std::move(doctype_repo_func)), - _loadtype_set_func(std::move(loadtype_set_func)), + _message_codec_provider(message_codec_provider), _target_resolver(std::make_unique<CachingRpcTargetResolver>(rpc_resources)) { register_server_methods(rpc_resources); @@ -117,12 +115,8 @@ void compress_and_add_payload_to_rpc_params(mbus::BlobRef payload, FRT_Values& p template <typename MessageType> void StorageApiRpcService::encode_and_compress_rpc_payload(const MessageType& msg, FRT_Values& params) { - // Shared ptrs must stay alive - // TODO move out, cache - auto repo = _doctype_repo_func(); - auto load_types = _loadtype_set_func(); - mbusprot::ProtocolSerialization7 codec(repo, *load_types); - auto payload = codec.encode(msg); + auto wrapped_codec = _message_codec_provider.wrapped_codec(); + auto payload = wrapped_codec->codec().encode(msg); compress_and_add_payload_to_rpc_params(payload, params); } @@ -140,14 +134,9 @@ void StorageApiRpcService::uncompress_rpc_payload( decompress(compression_type, uncompressed_length, blob, uncompressed, true); assert(uncompressed_length == uncompressed.getDataLen()); assert(uncompressed_length <= UINT32_MAX); + auto wrapped_codec = _message_codec_provider.wrapped_codec(); - // Shared ptrs must stay alive - // TODO move out, cache - auto repo = _doctype_repo_func(); - auto loadtypes = _loadtype_set_func(); - mbusprot::ProtocolSerialization7 codec(repo, *loadtypes); - - payload_callback(codec, mbus::BlobRef(uncompressed.getData(), uncompressed_length)); + payload_callback(wrapped_codec->codec(), mbus::BlobRef(uncompressed.getData(), uncompressed_length)); } void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) { diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index a1bb4008cbb..7ffa111fa60 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -6,7 +6,6 @@ #include <vespa/fnet/frt/invoker.h> #include <vespa/vespalib/stllike/string.h> #include <atomic> -#include <functional> #include <memory> class FRT_RPCRequest; @@ -28,18 +27,19 @@ class StorageReply; namespace rpc { -class SharedRpcResources; class CachingRpcTargetResolver; +class MessageCodecProvider; +class SharedRpcResources; class StorageApiRpcService : public FRT_Invokable, public FRT_IRequestWait { - MessageDispatcher& _message_dispatcher; - SharedRpcResources& _rpc_resources; + MessageDispatcher& _message_dispatcher; + SharedRpcResources& _rpc_resources; + MessageCodecProvider& _message_codec_provider; + std::unique_ptr<CachingRpcTargetResolver> _target_resolver; public: StorageApiRpcService(MessageDispatcher& message_dispatcher, SharedRpcResources& rpc_resources, - // TODO temporary! - std::function<std::shared_ptr<const document::DocumentTypeRepo>()> doctype_repo_func, - std::function<std::shared_ptr<documentapi::LoadTypeSet>()> loadtype_set_func); + MessageCodecProvider& message_codec_provider); ~StorageApiRpcService() override; void RPC_rpc_v1_send(FRT_RPCRequest* req); @@ -59,10 +59,6 @@ private: {} }; - std::function<std::shared_ptr<const document::DocumentTypeRepo>()> _doctype_repo_func; - std::function<std::shared_ptr<documentapi::LoadTypeSet>()> _loadtype_set_func; - std::unique_ptr<CachingRpcTargetResolver> _target_resolver; - void register_server_methods(SharedRpcResources&); template <typename PayloadCodecCallback> void uncompress_rpc_payload(const FRT_Values& params, PayloadCodecCallback payload_callback); |