summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-14 12:41:42 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-14 13:22:15 +0000
commit4f1b8b7d2abfc70b72ecc5be2b9a720d22de4947 (patch)
treeb878480ce382d8ae09f7fc6ec1980dd9b07b2db7 /storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
parent0817a6890194989a2f243e54d671252bee2f8920 (diff)
Add test scaffolding for StorageApiRpcService
Sets up full RPC/Slobrok endpoints for two simulated nodes. Add simple end-to-end test for request/response flow between nodes.
Diffstat (limited to 'storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp')
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp202
1 files changed, 202 insertions, 0 deletions
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
new file mode 100644
index 00000000000..01022895c0e
--- /dev/null
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -0,0 +1,202 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/base/testdocman.h>
+#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/documentapi/loadtypes/loadtypeset.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/slobrok/sbmirror.h>
+#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
+#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
+#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
+#include <vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h>
+#include <vespa/storage/storageserver/communicationmanager.h>
+#include <vespa/storage/storageserver/rpcrequestwrapper.h>
+#include <vespa/storage/storageserver/message_dispatcher.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <tests/common/testhelper.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <condition_variable>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <stdexcept>
+#include <string>
+
+#include <thread>
+
+using namespace ::testing;
+using namespace document::test;
+using namespace std::chrono_literals;
+
+namespace storage::rpc {
+
+namespace {
+
+constexpr std::chrono::duration message_timeout = 60s;
+
+class LockingMockOperationDispatcher : public MessageDispatcher {
+ using MessageQueueType = std::deque<std::shared_ptr<api::StorageMessage>>;
+
+ mutable std::mutex _mutex;
+ mutable std::condition_variable _cond;
+ MessageQueueType _enqueued;
+public:
+ LockingMockOperationDispatcher();
+ ~LockingMockOperationDispatcher() override;
+
+ void dispatch_sync(std::shared_ptr<api::StorageMessage> msg) override {
+ std::lock_guard lock(_mutex);
+ _enqueued.emplace_back(std::move(msg));
+ _cond.notify_all();
+ }
+
+ void dispatch_async(std::shared_ptr<api::StorageMessage> msg) override {
+ std::lock_guard lock(_mutex);
+ _enqueued.emplace_back(std::move(msg));
+ _cond.notify_all();
+ }
+
+ [[nodiscard]] bool empty() const noexcept {
+ std::lock_guard lock(_mutex);
+ return _enqueued.empty();
+ }
+
+ void wait_until_n_messages_received(size_t n) const {
+ std::unique_lock lock(_mutex);
+ const auto deadline = std::chrono::steady_clock::now() + message_timeout;
+ if (!_cond.wait_until(lock, deadline, [this, n]{ return (_enqueued.size() == n); })) {
+ throw std::runtime_error("Timed out waiting for message");
+ }
+ }
+
+ [[nodiscard]] std::shared_ptr<api::StorageMessage> pop_first_message() {
+ std::lock_guard lock(_mutex);
+ assert(!_enqueued.empty());
+ auto msg = std::move(_enqueued.front());
+ _enqueued.pop_front();
+ return msg;
+ }
+};
+
+LockingMockOperationDispatcher::LockingMockOperationDispatcher() = default;
+LockingMockOperationDispatcher::~LockingMockOperationDispatcher() = default;
+
+api::StorageMessageAddress make_address(uint16_t node_index, bool is_distributor) {
+ return {"coolcluster", (is_distributor ? lib::NodeType::DISTRIBUTOR : lib::NodeType::STORAGE), node_index};
+}
+
+vespalib::string to_slobrok_id(const api::StorageMessageAddress& address) {
+ // TODO factor out slobrok ID generation code to be independent of resolver?
+ return CachingRpcTargetResolver::address_to_slobrok_id(address);
+}
+
+class StorageApiNode {
+ vdstestlib::DirConfig _config;
+ std::shared_ptr<const document::DocumentTypeRepo> _doc_type_repo;
+ std::shared_ptr<const documentapi::LoadTypeSet> _load_type_set;
+ LockingMockOperationDispatcher _messages;
+ std::unique_ptr<MessageCodecProvider> _codec_provider;
+ std::unique_ptr<SharedRpcResources> _shared_rpc_resources;
+ std::unique_ptr<StorageApiRpcService> _service;
+ api::StorageMessageAddress _node_address;
+ vespalib::string _slobrok_id;
+public:
+ StorageApiNode(uint16_t node_index, bool is_distributor, const mbus::Slobrok& slobrok)
+ : _config(getStandardConfig(true)),
+ _doc_type_repo(document::TestDocRepo().getTypeRepoSp()),
+ _load_type_set(std::make_shared<documentapi::LoadTypeSet>()),
+ _node_address(make_address(node_index, is_distributor)),
+ _slobrok_id(to_slobrok_id(_node_address))
+ {
+ auto& cfg = _config.getConfig("stor-server");
+ cfg.set("node_index", std::to_string(node_index));
+ cfg.set("is_distributor", is_distributor ? "true" : "false");
+ addSlobrokConfig(_config, slobrok);
+
+ _shared_rpc_resources = std::make_unique<SharedRpcResources>(_config.getConfigId(), 0, 1);
+ // TODO make codec provider into interface so we can test decode-failures more easily?
+ _codec_provider = std::make_unique<MessageCodecProvider>(_doc_type_repo, _load_type_set);
+ _service = std::make_unique<StorageApiRpcService>(_messages, *_shared_rpc_resources, *_codec_provider);
+
+ _shared_rpc_resources->start_server_and_register_slobrok(_slobrok_id);
+ // Explicitly wait until we are visible in Slobrok. Just waiting for mirror readiness is not enough.
+ wait_until_visible_in_slobrok(_slobrok_id);
+ }
+
+ void wait_until_visible_in_slobrok(vespalib::stringref id) {
+ while (_shared_rpc_resources->slobrok_mirror().lookup(id).empty()) {
+ std::this_thread::sleep_for(10ms); // TODO timeout handling
+ }
+ }
+
+ const api::StorageMessageAddress& node_address() const noexcept { return _node_address; }
+
+ std::shared_ptr<api::PutCommand> create_dummy_put_command() const {
+ auto doc_type = _doc_type_repo->getDocumentType("testdoctype1");
+ auto doc = std::make_shared<document::Document>(*doc_type, document::DocumentId("id:foo:testdoctype1::bar"));
+ doc->setFieldValue(doc->getField("hstringval"), std::make_unique<document::StringFieldValue>("hello world"));
+ return std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(0)), std::move(doc), 100);
+ }
+
+ void send_request(std::shared_ptr<api::StorageCommand> req) {
+ ASSERT_TRUE(_messages.empty());
+ _service->send_rpc_v1_request(std::move(req));
+ ASSERT_TRUE(_messages.empty()); // If non-empty, request was bounced (Slobrok lookup failed)
+ }
+
+ // TODO move StorageTransportContext away from communicationmanager.h
+ // TODO refactor reply handling to avoid duping detail code with CommunicationManager?
+ void send_response(const std::shared_ptr<api::StorageReply>& reply) {
+ std::unique_ptr<StorageTransportContext> context(dynamic_cast<StorageTransportContext*>(
+ reply->getTransportContext().release()));
+ assert(context);
+ _service->encode_rpc_v1_response(*context->_request->raw_request(), *reply);
+ context->_request->returnRequest();
+ }
+
+ [[nodiscard]] std::shared_ptr<api::StorageMessage> wait_and_receive_single_message() {
+ _messages.wait_until_n_messages_received(1);
+ return _messages.pop_first_message();
+ }
+};
+
+} // anonymous namespace
+
+// TODO consider completely mocking Slobrok to avoid any race conditions during node registration
+struct StorageApiRpcServiceTest : Test {
+ mbus::Slobrok _slobrok;
+ std::unique_ptr<StorageApiNode> _node_0;
+ std::unique_ptr<StorageApiNode> _node_1;
+
+ StorageApiRpcServiceTest()
+ : _slobrok(),
+ _node_0(std::make_unique<StorageApiNode>(1, true, _slobrok)),
+ _node_1(std::make_unique<StorageApiNode>(4, false, _slobrok))
+ {
+ // FIXME ugh, this isn't particularly pretty...
+ _node_0->wait_until_visible_in_slobrok(to_slobrok_id(_node_1->node_address()));
+ _node_1->wait_until_visible_in_slobrok(to_slobrok_id(_node_0->node_address()));
+ }
+ ~StorageApiRpcServiceTest() override;
+};
+
+StorageApiRpcServiceTest::~StorageApiRpcServiceTest() = default;
+
+TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) {
+ auto cmd = _node_0->create_dummy_put_command();
+ cmd->setAddress(_node_1->node_address());
+ ASSERT_NO_FATAL_FAILURE(_node_0->send_request(cmd));
+
+ auto recv_msg = _node_1->wait_and_receive_single_message();
+ auto* put_cmd = dynamic_cast<api::PutCommand*>(recv_msg.get());
+ ASSERT_TRUE(put_cmd != nullptr);
+ auto reply = std::shared_ptr<api::StorageReply>(put_cmd->makeReply());
+ _node_1->send_response(reply);
+
+ auto recv_reply = _node_0->wait_and_receive_single_message();
+ auto* put_reply = dynamic_cast<api::PutReply*>(recv_reply.get());
+ ASSERT_TRUE(put_reply != nullptr);
+}
+
+}