diff options
Diffstat (limited to 'messagebus_test/src/tests/speed/JavaClient.java')
-rw-r--r-- | messagebus_test/src/tests/speed/JavaClient.java | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/speed/JavaClient.java b/messagebus_test/src/tests/speed/JavaClient.java new file mode 100644 index 00000000000..b905ab07e91 --- /dev/null +++ b/messagebus_test/src/tests/speed/JavaClient.java @@ -0,0 +1,137 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.test.*; +import com.yahoo.config.*; +import com.yahoo.messagebus.routing.*; +import com.yahoo.messagebus.network.*; +import com.yahoo.messagebus.network.rpc.*; +import java.util.Arrays; +import java.util.logging.*; + +public class JavaClient implements ReplyHandler { + + private static Logger log = Logger.getLogger(JavaClient.class.getName()); + + private static class Counts { + public int okCnt = 0; + public int failCnt = 0; + Counts() {} + Counts(int okCnt, int failCnt) { + this.okCnt = okCnt; + this.failCnt = failCnt; + } + } + + private SourceSession session; + private Counts counts = new Counts(); + private static long mySeq = 100000; + + public JavaClient(RPCMessageBus mb) { + session = mb.getMessageBus().createSourceSession(this, new SourceSessionParams().setTimeout(30)); + } + + public synchronized Counts sample() { + return new Counts(counts.okCnt, counts.failCnt); + } + + public void send() { + send(++mySeq); + } + + public void send(long seq) { + session.send(new MyMessage(seq), "test"); + } + + public void handleReply(Reply reply) { + if ((reply.getProtocol() == SimpleProtocol.NAME) + && (reply.getType() == SimpleProtocol.REPLY) + && (((SimpleReply)reply).getValue().equals("OK"))) + { + synchronized (this) { + counts.okCnt++; + } + } else { + synchronized (this) { + counts.failCnt++; + } + } + try { + send(); + } catch (IllegalStateException ignore) {} // handle paranoia for shutdown source sessions + } + + public void shutdown() { + session.destroy(); + } + + public static void main(String[] args) { + try { + RPCMessageBus mb = new RPCMessageBus( + new MessageBusParams() + .setRetryPolicy(new RetryTransientErrorsPolicy().setBaseDelay(0.1)) + .addProtocol(new SimpleProtocol()), + new RPCNetworkParams() + .setIdentity(new Identity("server/java")) + .setSlobrokConfigId("file:slobrok.cfg"), + "file:routing.cfg"); + JavaClient client = new JavaClient(mb); + + // let the system 'warm up' + Thread.sleep(5000); + + // inject messages into the feedback loop + for (int i = 0; i < 1024; ++i) { + client.send(i); + } + + // let the system 'warm up' + Thread.sleep(5000); + + long start; + long stop; + Counts before; + Counts after; + + start = System.currentTimeMillis(); + before = client.sample(); + Thread.sleep(10000); // Benchmark time + stop = System.currentTimeMillis(); + after = client.sample(); + stop -= start; + double time = (double)stop; + double msgCnt = (double)(after.okCnt - before.okCnt); + double throughput = (msgCnt / time) * 1000.0; + System.out.printf("JAVA-CLIENT: %g msg/s\n", throughput); + client.shutdown(); + mb.destroy(); + if (after.failCnt > before.failCnt) { + System.err.printf("JAVA-CLIENT: FAILED (%d -> %d)\n", + before.failCnt, after.failCnt); + System.exit(1); + } + } catch (Exception e) { + log.log(Level.SEVERE, "JAVA-CLIENT: Failed", e); + System.exit(1); + } + } + + private static class MyMessage extends SimpleMessage { + + final long seqId; + + MyMessage(long seqId) { + super("message"); + this.seqId = seqId; + } + + @Override + public boolean hasSequenceId() { + return true; + } + + @Override + public long getSequenceId() { + return seqId; + } + } +} |