diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /fnet/src/tests/connection_spread/connection_spread_test.cpp |
Publish
Diffstat (limited to 'fnet/src/tests/connection_spread/connection_spread_test.cpp')
-rw-r--r-- | fnet/src/tests/connection_spread/connection_spread_test.cpp | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp new file mode 100644 index 00000000000..a10278daf32 --- /dev/null +++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp @@ -0,0 +1,83 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/fnet/fnet.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <thread> +#include <chrono> +#include <set> + +using namespace std::literals; + +struct DummyAdapter : FNET_IServerAdapter { + bool InitAdminChannel(FNET_Channel *) override { return false; } + bool InitChannel(FNET_Channel *, uint32_t) override { return false; } +}; + +struct DummyStreamer : FNET_IPacketStreamer { + bool GetPacketInfo(FNET_DataBuffer *, uint32_t *, uint32_t *, uint32_t *, bool *) override { return false; } + FNET_Packet *Decode(FNET_DataBuffer *, uint32_t, uint32_t, FNET_Context) override { return nullptr; } + void Encode(FNET_Packet *, uint32_t, FNET_DataBuffer *) override {} +}; + +struct Fixture { + DummyStreamer streamer; + DummyAdapter adapter; + FastOS_ThreadPool thread_pool; + FNET_Transport client; + FNET_Transport server; + Fixture() : streamer(), adapter(), thread_pool(128 * 1024), client(8), server(8) + { + ASSERT_TRUE(client.Start(&thread_pool)); + ASSERT_TRUE(server.Start(&thread_pool)); + } + void wait_for_components(size_t client_cnt, size_t server_cnt) { + bool ok = false; + for (size_t i = 0; !ok && (i < 10000); ++i) { + std::this_thread::sleep_for(3ms); + ok = ((client.GetNumIOComponents() == client_cnt) && + (server.GetNumIOComponents() == server_cnt)); + } + EXPECT_EQUAL(client.GetNumIOComponents(), client_cnt); + EXPECT_EQUAL(server.GetNumIOComponents(), server_cnt); + } + ~Fixture() { + server.ShutDown(true); + client.ShutDown(true); + thread_pool.Close(); + } +}; + +void check_threads(FNET_Transport &transport, size_t num_threads, const vespalib::string &tag) { + std::set<FNET_TransportThread *> threads; + while (threads.size() < num_threads) { + threads.insert(transport.select_thread(nullptr, 0)); + } + for (auto thread: threads) { + uint32_t cnt = thread->GetNumIOComponents(); + fprintf(stderr, "-- %s thread: %u io components\n", tag.c_str(), cnt); + EXPECT_GREATER(cnt, 1u); + } +} + +TEST_F("require that connections are spread among transport threads", Fixture) +{ + FNET_Connector *listener = f1.server.Listen("tcp/0", &f1.streamer, &f1.adapter); + ASSERT_TRUE(listener); + uint32_t port = listener->GetPortNumber(); + vespalib::string spec = vespalib::make_string("tcp/localhost:%u", port); + std::vector<FNET_Connection *> connections; + for (size_t i = 0; i < 256; ++i) { + std::this_thread::sleep_for(1ms); + connections.push_back(f1.client.Connect(spec.c_str(), &f1.streamer)); + ASSERT_TRUE(connections.back()); + } + f1.wait_for_components(256, 257); + check_threads(f1.client, 8, "client"); + check_threads(f1.server, 8, "server"); + listener->SubRef(); + for (FNET_Connection *conn: connections) { + conn->SubRef(); + } +} + +TEST_MAIN() { TEST_RUN_ALL(); } |