diff options
author | Håvard Pettersen <havardpe@oath.com> | 2019-05-09 13:46:51 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2019-05-13 10:05:24 +0000 |
commit | 032ec0ed6f65a355c5c6402f2e2daae1f6ea5b00 (patch) | |
tree | 4a56cf3323bee758b962d723f8fa0b6c74ead7ed /jrt/tests/com | |
parent | c9f89a485d3dee9ddffb5107b31bf0bae91b18d4 (diff) |
multi-threaded transport for JRT
Diffstat (limited to 'jrt/tests/com')
-rw-r--r-- | jrt/tests/com/yahoo/jrt/EchoTest.java | 4 | ||||
-rw-r--r-- | jrt/tests/com/yahoo/jrt/LatencyTest.java | 170 | ||||
-rw-r--r-- | jrt/tests/com/yahoo/jrt/SessionTest.java | 4 |
3 files changed, 134 insertions, 44 deletions
diff --git a/jrt/tests/com/yahoo/jrt/EchoTest.java b/jrt/tests/com/yahoo/jrt/EchoTest.java index 16f18afb58c..67544d3f1d4 100644 --- a/jrt/tests/com/yahoo/jrt/EchoTest.java +++ b/jrt/tests/com/yahoo/jrt/EchoTest.java @@ -91,8 +91,8 @@ public class EchoTest { public void setUp() throws ListenFailedException { metrics = TransportMetrics.getInstance(); startSnapshot = metrics.snapshot(); - server = new Supervisor(new Transport(crypto)); - client = new Supervisor(new Transport(crypto)); + server = new Supervisor(new Transport(crypto, 1)); + client = new Supervisor(new Transport(crypto, 1)); acceptor = server.listen(new Spec(0)); target = client.connect(new Spec("localhost", acceptor.port())); server.addMethod(new Method("echo", "*", "*", this, "rpc_echo")); diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java index a1f71bda013..e8cd6cdc017 100644 --- a/jrt/tests/com/yahoo/jrt/LatencyTest.java +++ b/jrt/tests/com/yahoo/jrt/LatencyTest.java @@ -5,6 +5,8 @@ package com.yahoo.jrt; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.logging.Logger; import static com.yahoo.jrt.CryptoUtils.createTestTlsContext; @@ -14,71 +16,159 @@ import static org.junit.Assert.assertTrue; public class LatencyTest { private static final Logger log = Logger.getLogger(LatencyTest.class.getName()); - private static class Server implements AutoCloseable { - private Supervisor orb; - private Acceptor acceptor; - public Server(CryptoEngine crypto) throws ListenFailedException { - orb = new Supervisor(new Transport(crypto)); - acceptor = orb.listen(new Spec(0)); - orb.addMethod(new Method("inc", "i", "i", this, "rpc_inc")); + private static class Network implements AutoCloseable { + private final Supervisor server; + private final Supervisor client; + private final Acceptor acceptor; + public Network(CryptoEngine crypto, int threads) throws ListenFailedException { + server = new Supervisor(new Transport(crypto, threads)); + client = new Supervisor(new Transport(crypto, threads)); + server.addMethod(new Method("inc", "i", "i", this, "rpc_inc")); + acceptor = server.listen(new Spec(0)); } public Target connect() { - return orb.connect(new Spec("localhost", acceptor.port())); + return client.connect(new Spec("localhost", acceptor.port())); } public void rpc_inc(Request req) { req.returnValues().add(new Int32Value(req.parameters().get(0).asInt32() + 1)); } public void close() { acceptor.shutdown().join(); - orb.transport().shutdown().join(); + client.transport().shutdown().join(); + server.transport().shutdown().join(); } } - private void measureLatency(String prefix, Server server, boolean reconnect) { - int value = 100; - List<Double> list = new ArrayList<>(); - Target target = server.connect(); - for (int i = 0; i < 64; ++i) { - long before = System.nanoTime(); - if (reconnect) { + private static class Client { + + public static class Result { + public final double latency; + public final double throughput; + + public Result(double ms, double cnt) { + latency = ms; + throughput = cnt; + } + + public Result(Result[] results) { + double ms = 0.0; + double cnt = 0.0; + for (Result r: results) { + ms += r.latency; + cnt += r.throughput; + } + latency = (ms / results.length); + throughput = cnt; + } + } + + private final boolean reconnect; + private final Network network; + private final CyclicBarrier barrier; + private final CountDownLatch latch; + private final Throwable[] issues; + private final Result[] results; + + private void run(int threadId) { + try { + barrier.await(); + int value = 100; + final int warmupCnt = 10; + final int benchmarkCnt = 50; + final int cooldownCnt = 10; + final int totalReqs = (warmupCnt + benchmarkCnt + cooldownCnt); + long t1 = 0; + long t2 = 0; + List<Double> list = new ArrayList<>(); + Target target = network.connect(); + for (int i = 0; i < totalReqs; ++i) { + long before = System.nanoTime(); + if (i == warmupCnt) { + t1 = before; + } + if (i == (warmupCnt + benchmarkCnt)) { + t2 = before; + } + if (reconnect) { + target.close(); + target = network.connect(); + } + Request req = new Request("inc"); + req.parameters().add(new Int32Value(value)); + target.invokeSync(req, 60.0); + long duration = System.nanoTime() - before; + assertTrue(req.checkReturnTypes("i")); + assertEquals(value + 1, req.returnValues().get(0).asInt32()); + value++; + list.add(duration / 1000000.0); + } target.close(); - target = server.connect(); + Collections.sort(list); + double benchTime = (t2 - t1) / 1000000000.0; + results[threadId] = new Result(list.get(list.size() / 2), benchmarkCnt / benchTime); + } catch (Throwable issue) { + issues[threadId] = issue; + } finally { + latch.countDown(); } - Request req = new Request("inc"); - req.parameters().add(new Int32Value(value)); - target.invokeSync(req, 60.0); - assertTrue(req.checkReturnTypes("i")); - assertEquals(value + 1, req.returnValues().get(0).asInt32()); - value++; - long duration = System.nanoTime() - before; - list.add(duration / 1000000.0); } - target.close(); - Collections.sort(list); - log.info(prefix + "invocation latency: " + list.get(list.size() / 2) + " ms"); + + public Client(boolean reconnect, Network network, int numThreads) { + this.reconnect = reconnect; + this.network = network; + this.barrier = new CyclicBarrier(numThreads); + this.latch = new CountDownLatch(numThreads); + this.issues = new Throwable[numThreads]; + this.results = new Result[numThreads]; + } + + public void measureLatency(String prefix) throws Throwable { + for (int i = 0; i < results.length; ++i) { + final int threadId = i; + new Thread(()->run(threadId)).start(); + } + latch.await(); + for (Throwable issue: issues) { + if (issue != null) { + throw(issue); + } + } + Result result = new Result(results); + log.info(prefix + "latency: " + result.latency + " ms, throughput: " + result.throughput + " req/s"); + } } @org.junit.Test - public void testNullCryptoLatency() throws ListenFailedException { - try (Server server = new Server(new NullCryptoEngine())) { - measureLatency("[null crypto, no reconnect] ", server, false); - measureLatency("[null crypto, reconnect] ", server, true); + public void testNullCryptoLatency() throws Throwable { + try (Network network = new Network(new NullCryptoEngine(), 1)) { + new Client(false, network, 1).measureLatency("[null crypto, no reconnect] "); + new Client(true, network, 1).measureLatency("[null crypto, reconnect] "); } } @org.junit.Test - public void testXorCryptoLatency() throws ListenFailedException { - try (Server server = new Server(new XorCryptoEngine())) { - measureLatency("[xor crypto, no reconnect] ", server, false); - measureLatency("[xor crypto, reconnect] ", server, true); + public void testXorCryptoLatency() throws Throwable { + try (Network network = new Network(new XorCryptoEngine(), 1)) { + new Client(false, network, 1).measureLatency("[xor crypto, no reconnect] "); + new Client(true, network, 1).measureLatency("[xor crypto, reconnect] "); } } @org.junit.Test - public void testTlsCryptoLatency() throws ListenFailedException { - try (Server server = new Server(new TlsCryptoEngine(createTestTlsContext()))) { - measureLatency("[tls crypto, no reconnect] ", server, false); - measureLatency("[tls crypto, reconnect] ", server, true); + public void testTlsCryptoLatency() throws Throwable { + try (Network network = new Network(new TlsCryptoEngine(createTestTlsContext()), 1)) { + new Client(false, network, 1).measureLatency("[tls crypto, no reconnect] "); + new Client(true, network, 1).measureLatency("[tls crypto, reconnect] "); + } + } + + @org.junit.Test + public void testTransportThreadScaling() throws Throwable { + try (Network network = new Network(new NullCryptoEngine(), 1)) { + new Client(false, network, 64).measureLatency("[64 clients, 1/1 transport] "); + } + try (Network network = new Network(new NullCryptoEngine(), 4)) { + new Client(false, network, 64).measureLatency("[64 clients, 4/4 transport] "); } } } diff --git a/jrt/tests/com/yahoo/jrt/SessionTest.java b/jrt/tests/com/yahoo/jrt/SessionTest.java index 6f070959d7a..dc33af96e44 100644 --- a/jrt/tests/com/yahoo/jrt/SessionTest.java +++ b/jrt/tests/com/yahoo/jrt/SessionTest.java @@ -122,9 +122,9 @@ public class SessionTest implements SessionHandler { @Before public void setUp() throws ListenFailedException { Session.reset(); - server = new Test.Orb(new Transport(crypto)); + server = new Test.Orb(new Transport(crypto, 1)); server.setSessionHandler(this); - client = new Test.Orb(new Transport(crypto)); + client = new Test.Orb(new Transport(crypto, 1)); client.setSessionHandler(this); acceptor = server.listen(new Spec(0)); target = client.connect(new Spec("localhost", acceptor.port()), |