summaryrefslogtreecommitdiffstats
path: root/fnet/src/tests/connection_spread/connection_spread_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'fnet/src/tests/connection_spread/connection_spread_test.cpp')
-rw-r--r--fnet/src/tests/connection_spread/connection_spread_test.cpp83
1 files changed, 83 insertions, 0 deletions
diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp
new file mode 100644
index 00000000000..a10278daf32
--- /dev/null
+++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp
@@ -0,0 +1,83 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/fnet/fnet.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <thread>
+#include <chrono>
+#include <set>
+
+using namespace std::literals;
+
+struct DummyAdapter : FNET_IServerAdapter {
+ bool InitAdminChannel(FNET_Channel *) override { return false; }
+ bool InitChannel(FNET_Channel *, uint32_t) override { return false; }
+};
+
+struct DummyStreamer : FNET_IPacketStreamer {
+ bool GetPacketInfo(FNET_DataBuffer *, uint32_t *, uint32_t *, uint32_t *, bool *) override { return false; }
+ FNET_Packet *Decode(FNET_DataBuffer *, uint32_t, uint32_t, FNET_Context) override { return nullptr; }
+ void Encode(FNET_Packet *, uint32_t, FNET_DataBuffer *) override {}
+};
+
+struct Fixture {
+ DummyStreamer streamer;
+ DummyAdapter adapter;
+ FastOS_ThreadPool thread_pool;
+ FNET_Transport client;
+ FNET_Transport server;
+ Fixture() : streamer(), adapter(), thread_pool(128 * 1024), client(8), server(8)
+ {
+ ASSERT_TRUE(client.Start(&thread_pool));
+ ASSERT_TRUE(server.Start(&thread_pool));
+ }
+ void wait_for_components(size_t client_cnt, size_t server_cnt) {
+ bool ok = false;
+ for (size_t i = 0; !ok && (i < 10000); ++i) {
+ std::this_thread::sleep_for(3ms);
+ ok = ((client.GetNumIOComponents() == client_cnt) &&
+ (server.GetNumIOComponents() == server_cnt));
+ }
+ EXPECT_EQUAL(client.GetNumIOComponents(), client_cnt);
+ EXPECT_EQUAL(server.GetNumIOComponents(), server_cnt);
+ }
+ ~Fixture() {
+ server.ShutDown(true);
+ client.ShutDown(true);
+ thread_pool.Close();
+ }
+};
+
+void check_threads(FNET_Transport &transport, size_t num_threads, const vespalib::string &tag) {
+ std::set<FNET_TransportThread *> threads;
+ while (threads.size() < num_threads) {
+ threads.insert(transport.select_thread(nullptr, 0));
+ }
+ for (auto thread: threads) {
+ uint32_t cnt = thread->GetNumIOComponents();
+ fprintf(stderr, "-- %s thread: %u io components\n", tag.c_str(), cnt);
+ EXPECT_GREATER(cnt, 1u);
+ }
+}
+
+TEST_F("require that connections are spread among transport threads", Fixture)
+{
+ FNET_Connector *listener = f1.server.Listen("tcp/0", &f1.streamer, &f1.adapter);
+ ASSERT_TRUE(listener);
+ uint32_t port = listener->GetPortNumber();
+ vespalib::string spec = vespalib::make_string("tcp/localhost:%u", port);
+ std::vector<FNET_Connection *> connections;
+ for (size_t i = 0; i < 256; ++i) {
+ std::this_thread::sleep_for(1ms);
+ connections.push_back(f1.client.Connect(spec.c_str(), &f1.streamer));
+ ASSERT_TRUE(connections.back());
+ }
+ f1.wait_for_components(256, 257);
+ check_threads(f1.client, 8, "client");
+ check_threads(f1.server, 8, "server");
+ listener->SubRef();
+ for (FNET_Connection *conn: connections) {
+ conn->SubRef();
+ }
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }