summaryrefslogtreecommitdiffstats
path: root/jrt/tests
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2019-05-09 13:46:51 +0000
committerHåvard Pettersen <havardpe@oath.com>2019-05-13 10:05:24 +0000
commit032ec0ed6f65a355c5c6402f2e2daae1f6ea5b00 (patch)
tree4a56cf3323bee758b962d723f8fa0b6c74ead7ed /jrt/tests
parentc9f89a485d3dee9ddffb5107b31bf0bae91b18d4 (diff)
multi-threaded transport for JRT
Diffstat (limited to 'jrt/tests')
-rw-r--r--jrt/tests/com/yahoo/jrt/EchoTest.java4
-rw-r--r--jrt/tests/com/yahoo/jrt/LatencyTest.java170
-rw-r--r--jrt/tests/com/yahoo/jrt/SessionTest.java4
3 files changed, 134 insertions, 44 deletions
diff --git a/jrt/tests/com/yahoo/jrt/EchoTest.java b/jrt/tests/com/yahoo/jrt/EchoTest.java
index 16f18afb58c..67544d3f1d4 100644
--- a/jrt/tests/com/yahoo/jrt/EchoTest.java
+++ b/jrt/tests/com/yahoo/jrt/EchoTest.java
@@ -91,8 +91,8 @@ public class EchoTest {
public void setUp() throws ListenFailedException {
metrics = TransportMetrics.getInstance();
startSnapshot = metrics.snapshot();
- server = new Supervisor(new Transport(crypto));
- client = new Supervisor(new Transport(crypto));
+ server = new Supervisor(new Transport(crypto, 1));
+ client = new Supervisor(new Transport(crypto, 1));
acceptor = server.listen(new Spec(0));
target = client.connect(new Spec("localhost", acceptor.port()));
server.addMethod(new Method("echo", "*", "*", this, "rpc_echo"));
diff --git a/jrt/tests/com/yahoo/jrt/LatencyTest.java b/jrt/tests/com/yahoo/jrt/LatencyTest.java
index a1f71bda013..e8cd6cdc017 100644
--- a/jrt/tests/com/yahoo/jrt/LatencyTest.java
+++ b/jrt/tests/com/yahoo/jrt/LatencyTest.java
@@ -5,6 +5,8 @@ package com.yahoo.jrt;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.logging.Logger;
import static com.yahoo.jrt.CryptoUtils.createTestTlsContext;
@@ -14,71 +16,159 @@ import static org.junit.Assert.assertTrue;
public class LatencyTest {
private static final Logger log = Logger.getLogger(LatencyTest.class.getName());
- private static class Server implements AutoCloseable {
- private Supervisor orb;
- private Acceptor acceptor;
- public Server(CryptoEngine crypto) throws ListenFailedException {
- orb = new Supervisor(new Transport(crypto));
- acceptor = orb.listen(new Spec(0));
- orb.addMethod(new Method("inc", "i", "i", this, "rpc_inc"));
+ private static class Network implements AutoCloseable {
+ private final Supervisor server;
+ private final Supervisor client;
+ private final Acceptor acceptor;
+ public Network(CryptoEngine crypto, int threads) throws ListenFailedException {
+ server = new Supervisor(new Transport(crypto, threads));
+ client = new Supervisor(new Transport(crypto, threads));
+ server.addMethod(new Method("inc", "i", "i", this, "rpc_inc"));
+ acceptor = server.listen(new Spec(0));
}
public Target connect() {
- return orb.connect(new Spec("localhost", acceptor.port()));
+ return client.connect(new Spec("localhost", acceptor.port()));
}
public void rpc_inc(Request req) {
req.returnValues().add(new Int32Value(req.parameters().get(0).asInt32() + 1));
}
public void close() {
acceptor.shutdown().join();
- orb.transport().shutdown().join();
+ client.transport().shutdown().join();
+ server.transport().shutdown().join();
}
}
- private void measureLatency(String prefix, Server server, boolean reconnect) {
- int value = 100;
- List<Double> list = new ArrayList<>();
- Target target = server.connect();
- for (int i = 0; i < 64; ++i) {
- long before = System.nanoTime();
- if (reconnect) {
+ private static class Client {
+
+ public static class Result {
+ public final double latency;
+ public final double throughput;
+
+ public Result(double ms, double cnt) {
+ latency = ms;
+ throughput = cnt;
+ }
+
+ public Result(Result[] results) {
+ double ms = 0.0;
+ double cnt = 0.0;
+ for (Result r: results) {
+ ms += r.latency;
+ cnt += r.throughput;
+ }
+ latency = (ms / results.length);
+ throughput = cnt;
+ }
+ }
+
+ private final boolean reconnect;
+ private final Network network;
+ private final CyclicBarrier barrier;
+ private final CountDownLatch latch;
+ private final Throwable[] issues;
+ private final Result[] results;
+
+ private void run(int threadId) {
+ try {
+ barrier.await();
+ int value = 100;
+ final int warmupCnt = 10;
+ final int benchmarkCnt = 50;
+ final int cooldownCnt = 10;
+ final int totalReqs = (warmupCnt + benchmarkCnt + cooldownCnt);
+ long t1 = 0;
+ long t2 = 0;
+ List<Double> list = new ArrayList<>();
+ Target target = network.connect();
+ for (int i = 0; i < totalReqs; ++i) {
+ long before = System.nanoTime();
+ if (i == warmupCnt) {
+ t1 = before;
+ }
+ if (i == (warmupCnt + benchmarkCnt)) {
+ t2 = before;
+ }
+ if (reconnect) {
+ target.close();
+ target = network.connect();
+ }
+ Request req = new Request("inc");
+ req.parameters().add(new Int32Value(value));
+ target.invokeSync(req, 60.0);
+ long duration = System.nanoTime() - before;
+ assertTrue(req.checkReturnTypes("i"));
+ assertEquals(value + 1, req.returnValues().get(0).asInt32());
+ value++;
+ list.add(duration / 1000000.0);
+ }
target.close();
- target = server.connect();
+ Collections.sort(list);
+ double benchTime = (t2 - t1) / 1000000000.0;
+ results[threadId] = new Result(list.get(list.size() / 2), benchmarkCnt / benchTime);
+ } catch (Throwable issue) {
+ issues[threadId] = issue;
+ } finally {
+ latch.countDown();
}
- Request req = new Request("inc");
- req.parameters().add(new Int32Value(value));
- target.invokeSync(req, 60.0);
- assertTrue(req.checkReturnTypes("i"));
- assertEquals(value + 1, req.returnValues().get(0).asInt32());
- value++;
- long duration = System.nanoTime() - before;
- list.add(duration / 1000000.0);
}
- target.close();
- Collections.sort(list);
- log.info(prefix + "invocation latency: " + list.get(list.size() / 2) + " ms");
+
+ public Client(boolean reconnect, Network network, int numThreads) {
+ this.reconnect = reconnect;
+ this.network = network;
+ this.barrier = new CyclicBarrier(numThreads);
+ this.latch = new CountDownLatch(numThreads);
+ this.issues = new Throwable[numThreads];
+ this.results = new Result[numThreads];
+ }
+
+ public void measureLatency(String prefix) throws Throwable {
+ for (int i = 0; i < results.length; ++i) {
+ final int threadId = i;
+ new Thread(()->run(threadId)).start();
+ }
+ latch.await();
+ for (Throwable issue: issues) {
+ if (issue != null) {
+ throw(issue);
+ }
+ }
+ Result result = new Result(results);
+ log.info(prefix + "latency: " + result.latency + " ms, throughput: " + result.throughput + " req/s");
+ }
}
@org.junit.Test
- public void testNullCryptoLatency() throws ListenFailedException {
- try (Server server = new Server(new NullCryptoEngine())) {
- measureLatency("[null crypto, no reconnect] ", server, false);
- measureLatency("[null crypto, reconnect] ", server, true);
+ public void testNullCryptoLatency() throws Throwable {
+ try (Network network = new Network(new NullCryptoEngine(), 1)) {
+ new Client(false, network, 1).measureLatency("[null crypto, no reconnect] ");
+ new Client(true, network, 1).measureLatency("[null crypto, reconnect] ");
}
}
@org.junit.Test
- public void testXorCryptoLatency() throws ListenFailedException {
- try (Server server = new Server(new XorCryptoEngine())) {
- measureLatency("[xor crypto, no reconnect] ", server, false);
- measureLatency("[xor crypto, reconnect] ", server, true);
+ public void testXorCryptoLatency() throws Throwable {
+ try (Network network = new Network(new XorCryptoEngine(), 1)) {
+ new Client(false, network, 1).measureLatency("[xor crypto, no reconnect] ");
+ new Client(true, network, 1).measureLatency("[xor crypto, reconnect] ");
}
}
@org.junit.Test
- public void testTlsCryptoLatency() throws ListenFailedException {
- try (Server server = new Server(new TlsCryptoEngine(createTestTlsContext()))) {
- measureLatency("[tls crypto, no reconnect] ", server, false);
- measureLatency("[tls crypto, reconnect] ", server, true);
+ public void testTlsCryptoLatency() throws Throwable {
+ try (Network network = new Network(new TlsCryptoEngine(createTestTlsContext()), 1)) {
+ new Client(false, network, 1).measureLatency("[tls crypto, no reconnect] ");
+ new Client(true, network, 1).measureLatency("[tls crypto, reconnect] ");
+ }
+ }
+
+ @org.junit.Test
+ public void testTransportThreadScaling() throws Throwable {
+ try (Network network = new Network(new NullCryptoEngine(), 1)) {
+ new Client(false, network, 64).measureLatency("[64 clients, 1/1 transport] ");
+ }
+ try (Network network = new Network(new NullCryptoEngine(), 4)) {
+ new Client(false, network, 64).measureLatency("[64 clients, 4/4 transport] ");
}
}
}
diff --git a/jrt/tests/com/yahoo/jrt/SessionTest.java b/jrt/tests/com/yahoo/jrt/SessionTest.java
index 6f070959d7a..dc33af96e44 100644
--- a/jrt/tests/com/yahoo/jrt/SessionTest.java
+++ b/jrt/tests/com/yahoo/jrt/SessionTest.java
@@ -122,9 +122,9 @@ public class SessionTest implements SessionHandler {
@Before
public void setUp() throws ListenFailedException {
Session.reset();
- server = new Test.Orb(new Transport(crypto));
+ server = new Test.Orb(new Transport(crypto, 1));
server.setSessionHandler(this);
- client = new Test.Orb(new Transport(crypto));
+ client = new Test.Orb(new Transport(crypto, 1));
client.setSessionHandler(this);
acceptor = server.listen(new Spec(0));
target = client.connect(new Spec("localhost", acceptor.port()),