// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using document::test::makeDocumentBucket; using namespace ::testing; namespace storage { vespalib::string _storage("storage"); using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; struct CommunicationManagerTest : Test { static constexpr uint32_t MESSAGE_WAIT_TIME_SEC = 60; void doTestConfigPropagation(bool isContentNode); static std::shared_ptr createDummyCommand(api::StorageMessage::Priority priority) { auto cmd = std::make_shared(makeDocumentBucket(document::BucketId(0)), document::DocumentId("id:ns:mytype::mydoc"), document::AllFields::NAME); cmd->setAddress(api::StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); cmd->setPriority(priority); return cmd; } }; namespace { void wait_for_slobrok_visibility(const CommunicationManager& mgr, const api::StorageMessageAddress& addr) { const auto deadline = vespalib::steady_clock::now() + 60s; do { if (mgr.address_visible_in_slobrok(addr)) { return; } std::this_thread::sleep_for(10ms); } while (vespalib::steady_clock::now() < deadline); FAIL() << "Timed out waiting for address " << addr.toString() << " to be visible in Slobrok"; } } TEST_F(CommunicationManagerTest, simple) { mbus::Slobrok slobrok; vdstestlib::DirConfig distConfig(getStandardConfig(false)); vdstestlib::DirConfig storConfig(getStandardConfig(true)); distConfig.getConfig("stor-server").set("node_index", "1"); storConfig.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(distConfig, slobrok); addSlobrokConfig(storConfig, slobrok); // Set up a "distributor" and a "storage" node with communication // managers and a dummy storage link below we can use for testing. TestServiceLayerApp storNode(storConfig.getConfigId()); TestDistributorApp distNode(distConfig.getConfigId()); auto dist_cfg_uri = config::ConfigUri(distConfig.getConfigId()); auto stor_cfg_uri = config::ConfigUri(storConfig.getConfigId()); CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri, *config_from(dist_cfg_uri)); CommunicationManager storage(storNode.getComponentRegister(), stor_cfg_uri, *config_from(stor_cfg_uri)); auto* distributorLink = new DummyStorageLink(); auto* storageLink = new DummyStorageLink(); distributor.push_back(std::unique_ptr(distributorLink)); storage.push_back(std::unique_ptr(storageLink)); distributor.open(); storage.open(); auto stor_addr = api::StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1); auto distr_addr = api::StorageMessageAddress::create(&_storage, lib::NodeType::DISTRIBUTOR, 1); // It is undefined when the logical nodes will be visible in each others Slobrok // mirrors, so explicitly wait until mutual visibility is ensured. Failure to do this // might cause the below message to be immediately bounced due to failing to map the // storage address to an actual RPC endpoint. ASSERT_NO_FATAL_FAILURE(wait_for_slobrok_visibility(distributor, stor_addr)); ASSERT_NO_FATAL_FAILURE(wait_for_slobrok_visibility(storage, distr_addr)); // Send a message through from distributor to storage auto cmd = std::make_shared( makeDocumentBucket(document::BucketId(0)), document::DocumentId("id:ns:mytype::mydoc"), document::AllFields::NAME); cmd->setAddress(stor_addr); distributorLink->sendUp(cmd); storageLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC); ASSERT_GT(storageLink->getNumCommands(), 0); auto cmd2 = std::dynamic_pointer_cast(storageLink->getCommand(0)); EXPECT_EQ("id:ns:mytype::mydoc", dynamic_cast(*cmd2).getDocumentId().toString()); // Reply to the message std::shared_ptr reply(cmd2->makeReply().release()); storageLink->sendUp(reply); storageLink->sendUp(reply); distributorLink->waitForMessages(1, MESSAGE_WAIT_TIME_SEC); ASSERT_GT(distributorLink->getNumCommands(), 0); auto reply2 = std::dynamic_pointer_cast(distributorLink->getCommand(0)); EXPECT_FALSE(reply2->wasFound()); } void CommunicationManagerTest::doTestConfigPropagation(bool isContentNode) { mbus::Slobrok slobrok; vdstestlib::DirConfig config(getStandardConfig(isContentNode)); config.getConfig("stor-server").set("node_index", "1"); auto& cfg = config.getConfig("stor-communicationmanager"); cfg.set("mbus_content_node_max_pending_count", "12345"); cfg.set("mbus_content_node_max_pending_size", "555666"); cfg.set("mbus_distributor_node_max_pending_count", "6789"); cfg.set("mbus_distributor_node_max_pending_size", "777888"); addSlobrokConfig(config, slobrok); std::unique_ptr node; if (isContentNode) { node = std::make_unique(config.getConfigId()); } else { node = std::make_unique(config.getConfigId()); } auto cfg_uri = config::ConfigUri(config.getConfigId()); CommunicationManager commMgr(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri)); auto* storageLink = new DummyStorageLink(); commMgr.push_back(std::unique_ptr(storageLink)); commMgr.open(); // Outer type is RPCMessageBus, which wraps regular MessageBus. auto& mbus = commMgr.getMessageBus().getMessageBus(); if (isContentNode) { EXPECT_EQ(12345, mbus.getMaxPendingCount()); EXPECT_EQ(555666, mbus.getMaxPendingSize()); } else { EXPECT_EQ(6789, mbus.getMaxPendingCount()); EXPECT_EQ(777888, mbus.getMaxPendingSize()); } // Test live reconfig of limits. using ConfigBuilder = vespa::config::content::core::StorCommunicationmanagerConfigBuilder; auto liveCfg = std::make_unique(); liveCfg->mbusContentNodeMaxPendingCount = 777777; liveCfg->mbusDistributorNodeMaxPendingCount = 999999; commMgr.on_configure(*liveCfg); if (isContentNode) { EXPECT_EQ(777777, mbus.getMaxPendingCount()); } else { EXPECT_EQ(999999, mbus.getMaxPendingCount()); } } TEST_F(CommunicationManagerTest, dist_pending_limit_configs_are_propagated_to_message_bus) { doTestConfigPropagation(false); } TEST_F(CommunicationManagerTest, stor_pending_limit_configs_are_propagated_to_message_bus) { doTestConfigPropagation(true); } TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { mbus::Slobrok slobrok; vdstestlib::DirConfig storConfig(getStandardConfig(true)); storConfig.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(storConfig, slobrok); TestServiceLayerApp storNode(storConfig.getConfigId()); auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri)); auto* storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr(storageLink)); storage.open(); // Message dequeing does not start before we invoke `open` on the storage // link chain, so we enqueue messages in randomized priority order before // doing so. After starting the thread, we should get messages down // the chain in a deterministic FIFO order and _not_ priority-order. // Lower number == higher priority. std::vector pris{200, 0, 255, 128}; for (auto pri : pris) { storage.dispatch_async(createDummyCommand(pri)); } storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); for (size_t i = 0; i < pris.size(); ++i) { // Casting is just to avoid getting mismatched values printed to the // output verbatim as chars. EXPECT_EQ( uint32_t(pris[i]), uint32_t(storageLink->getCommand(i)->getPriority())); } } TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { mbus::Slobrok slobrok; vdstestlib::DirConfig storConfig(getStandardConfig(true)); storConfig.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(storConfig, slobrok); TestServiceLayerApp storNode(storConfig.getConfigId()); auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri)); auto* storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr(storageLink)); storage.open(); std::vector pris{200, 0, 255, 128}; for (auto pri : pris) { storage.dispatch_async(createDummyCommand(pri)->makeReply()); } storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); // Want FIFO order for replies, not priority-sorted order. for (size_t i = 0; i < pris.size(); ++i) { EXPECT_EQ( uint32_t(pris[i]), uint32_t(storageLink->getCommand(i)->getPriority())); } } struct MockMbusReplyHandler : mbus::IReplyHandler { std::vector> replies; void handleReply(std::unique_ptr msg) override { replies.emplace_back(std::move(msg)); } }; struct CommunicationManagerFixture { MockMbusReplyHandler reply_handler; mbus::Slobrok slobrok; std::unique_ptr node; std::unique_ptr comm_mgr; DummyStorageLink* bottom_link; CommunicationManagerFixture() { vdstestlib::DirConfig stor_config(getStandardConfig(true)); stor_config.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(stor_config, slobrok); node = std::make_unique(stor_config.getConfigId()); auto cfg_uri = config::ConfigUri(stor_config.getConfigId()); comm_mgr = std::make_unique(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri)); bottom_link = new DummyStorageLink(); comm_mgr->push_back(std::unique_ptr(bottom_link)); comm_mgr->open(); } ~CommunicationManagerFixture(); template std::unique_ptr documentapi_message_for_space(const char *space) { auto cmd = std::make_unique(document::DocumentId(vespalib::make_string("id::%s::stuff", space))); // Bind reply handling to our own mock handler cmd->pushHandler(reply_handler); return cmd; } std::unique_ptr documentapi_remove_message_for_space(const char *space) { return documentapi_message_for_space(space); } std::unique_ptr documentapi_get_message_for_space(const char *space) { return documentapi_message_for_space(space); } }; CommunicationManagerFixture::~CommunicationManagerFixture() = default; using vespa::config::content::core::BucketspacesConfigBuilder; namespace { BucketspacesConfigBuilder::Documenttype doc_type(vespalib::stringref name, vespalib::stringref space) { BucketspacesConfigBuilder::Documenttype dt; dt.name = name; dt.bucketspace = space; return dt; } } TEST_F(CommunicationManagerTest, bucket_space_config_can_be_updated_live) { CommunicationManagerFixture f; BucketspacesConfigBuilder config; config.documenttype.emplace_back(doc_type("foo", "default")); config.documenttype.emplace_back(doc_type("bar", "global")); f.comm_mgr->updateBucketSpacesConfig(config); f.comm_mgr->handleMessage(f.documentapi_remove_message_for_space("bar")); f.comm_mgr->handleMessage(f.documentapi_remove_message_for_space("foo")); f.bottom_link->waitForMessages(2, MESSAGE_WAIT_TIME_SEC); auto cmd1 = f.bottom_link->getCommand(0); EXPECT_EQ(document::FixedBucketSpaces::global_space(), cmd1->getBucket().getBucketSpace()); auto cmd2 = f.bottom_link->getCommand(1); EXPECT_EQ(document::FixedBucketSpaces::default_space(), cmd2->getBucket().getBucketSpace()); config.documenttype[1] = doc_type("bar", "default"); f.comm_mgr->updateBucketSpacesConfig(config); f.comm_mgr->handleMessage(f.documentapi_remove_message_for_space("bar")); f.bottom_link->waitForMessages(3, MESSAGE_WAIT_TIME_SEC); auto cmd3 = f.bottom_link->getCommand(2); EXPECT_EQ(document::FixedBucketSpaces::default_space(), cmd3->getBucket().getBucketSpace()); EXPECT_EQ(uint64_t(0), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); } TEST_F(CommunicationManagerTest, unmapped_bucket_space_documentapi_request_returns_error_reply) { CommunicationManagerFixture f; BucketspacesConfigBuilder config; config.documenttype.emplace_back(doc_type("foo", "default")); f.comm_mgr->updateBucketSpacesConfig(config); EXPECT_EQ(uint64_t(0), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); f.comm_mgr->handleMessage(f.documentapi_remove_message_for_space("fluff")); ASSERT_EQ(1, f.reply_handler.replies.size()); auto& reply = *f.reply_handler.replies[0]; ASSERT_TRUE(reply.hasErrors()); EXPECT_EQ(static_cast(api::ReturnCode::REJECTED), reply.getError(0).getCode()); EXPECT_EQ(uint64_t(1), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); } TEST_F(CommunicationManagerTest, unmapped_bucket_space_for_get_documentapi_request_returns_error_reply) { CommunicationManagerFixture f; BucketspacesConfigBuilder config; config.documenttype.emplace_back(doc_type("foo", "default")); f.comm_mgr->updateBucketSpacesConfig(config); f.comm_mgr->handleMessage(f.documentapi_get_message_for_space("fluff")); ASSERT_EQ(1, f.reply_handler.replies.size()); auto& reply = *f.reply_handler.replies[0]; ASSERT_TRUE(reply.hasErrors()); EXPECT_EQ(static_cast(api::ReturnCode::REJECTED), reply.getError(0).getCode()); EXPECT_EQ(uint64_t(1), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); } TEST_F(CommunicationManagerTest, communication_manager_swallows_internal_replies) { CommunicationManagerFixture f; auto msg = std::make_unique(makeDocumentBucket({16, 1})); auto reply = std::shared_ptr(msg->makeReply()); EXPECT_TRUE(f.comm_mgr->onUp(reply)); // true == handled by storage link } } // storage