// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std::literals; struct DummyAdapter : FNET_IServerAdapter { bool InitChannel(FNET_Channel *, uint32_t) override { return false; } }; struct DummyStreamer : FNET_IPacketStreamer { bool GetPacketInfo(FNET_DataBuffer *, uint32_t *, uint32_t *, uint32_t *, bool *) override { return false; } FNET_Packet *Decode(FNET_DataBuffer *, uint32_t, uint32_t, FNET_Context) override { return nullptr; } void Encode(FNET_Packet *, uint32_t, FNET_DataBuffer *) override {} }; struct Fixture { DummyStreamer streamer; DummyAdapter adapter; FastOS_ThreadPool thread_pool; FNET_Transport client; FNET_Transport server; Fixture() : streamer(), adapter(), thread_pool(), client(8), server(8) { ASSERT_TRUE(client.Start(&thread_pool)); ASSERT_TRUE(server.Start(&thread_pool)); } void wait_for_components(size_t client_cnt, size_t server_cnt) { bool ok = false; for (size_t i = 0; !ok && (i < 10000); ++i) { std::this_thread::sleep_for(3ms); ok = ((client.GetNumIOComponents() == client_cnt) && (server.GetNumIOComponents() == server_cnt)); } EXPECT_EQUAL(client.GetNumIOComponents(), client_cnt); EXPECT_EQUAL(server.GetNumIOComponents(), server_cnt); } ~Fixture() { server.ShutDown(true); client.ShutDown(true); thread_pool.Close(); } }; void check_threads(FNET_Transport &transport, size_t num_threads, const vespalib::string &tag) { std::set threads; while (threads.size() < num_threads) { threads.insert(transport.select_thread(nullptr, 0)); } for (auto thread: threads) { uint32_t cnt = thread->GetNumIOComponents(); fprintf(stderr, "-- %s thread: %u io components\n", tag.c_str(), cnt); EXPECT_GREATER(cnt, 1u); } } TEST_F("require that connections are spread among transport threads", Fixture) { FNET_Connector *listener = f1.server.Listen("tcp/0", &f1.streamer, &f1.adapter); ASSERT_TRUE(listener); uint32_t port = listener->GetPortNumber(); vespalib::string spec = vespalib::make_string("tcp/localhost:%u", port); std::vector connections; for (size_t i = 0; i < 256; ++i) { std::this_thread::sleep_for(1ms); if (i > f1.server.GetNumIOComponents() + 16) { /* * tcp listen backlog is limited (cf. SOMAXCONN). * Slow down when getting too far ahead of server. */ std::this_thread::sleep_for(10ms); } connections.push_back(f1.client.Connect(spec.c_str(), &f1.streamer)); ASSERT_TRUE(connections.back()); } f1.wait_for_components(256, 257); check_threads(f1.client, 8, "client"); check_threads(f1.server, 8, "server"); listener->SubRef(); for (FNET_Connection *conn: connections) { conn->SubRef(); } } TEST_MAIN() { TEST_RUN_ALL(); }