diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-13 17:40:49 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-13 17:40:49 +0200 |
commit | df13ce307c399c649ae2fb2730557882380998c2 (patch) | |
tree | c20ce83733d8c911c1c78965dd9f36d61106901b /vespaclient-java/src/main/java/com/yahoo/dummyreceiver | |
parent | 377749b786f6a24a67b4624d8114878117a4f2ee (diff) |
Use atomic and synchronized sections to do proper accounting of replies. And some geenral code health.
Diffstat (limited to 'vespaclient-java/src/main/java/com/yahoo/dummyreceiver')
-rwxr-xr-x | vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java | 53 |
1 files changed, 24 insertions, 29 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java index f8b785fd4e3..dd509d9e23a 100755 --- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java +++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java @@ -20,39 +20,39 @@ import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.network.Identity; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import java.util.LinkedList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static java.lang.System.out; public class DummyReceiver implements MessageHandler { - String name = null; - DestinationSession session; - MessageBusDocumentAccess da; - long sleepTime = 0; - long messageCount = 0; - int maxPendingCount = 0; - long silentNum = 0; - boolean instant = false; - ThreadPoolExecutor executor = null; - int threads = 10; - long maxQueueTime = -1; - BlockingQueue<Runnable> queue; - boolean verbose = false; + private String name = null; + private DestinationSession session; + private MessageBusDocumentAccess da; + private long sleepTime = 0; + private AtomicLong messageCount = new AtomicLong(0); + private long silentNum = 0; + private boolean instant = false; + private ThreadPoolExecutor executor = null; + private int threads = 10; + private long maxQueueTime = -1; + private BlockingQueue<Runnable> queue; + private boolean verbose = false; private boolean helpOption = false; - DummyReceiver() { + private DummyReceiver() { } - public class Task implements Runnable { + class Task implements Runnable { Reply reply; - public Task(Reply reply) { + Task(Reply reply) { this.reply = reply; } @@ -68,21 +68,21 @@ public class DummyReceiver implements MessageHandler { } } - public void init() { + private void init() { MessageBusParams params = new MessageBusParams(new LoadTypeSet()); params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name))); params.setDocumentManagerConfigId("client"); - params.getMessageBusParams().setMaxPendingCount(maxPendingCount); + params.getMessageBusParams().setMaxPendingCount(0); params.getMessageBusParams().setMaxPendingSize(0); da = new MessageBusDocumentAccess(params); queue = (maxQueueTime < 0) ? new LinkedBlockingDeque<>() : new ThroughputLimitQueue<>(maxQueueTime); session = da.getMessageBus().createDestinationSession("default", true, this); executor = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue, new DaemonThreadFactory()); - System.out.println("Registered listener at " + name + "/default with " + maxPendingCount + " max pending and sleep time of " + sleepTime); + System.out.println("Registered listener at " + name + "/default with 0 max pending and sleep time of " + sleepTime); } public void handleMessage(Message message) { - messageCount++; + long messageCount = this.messageCount.incrementAndGet(); if ( silentNum == 0 ) { System.out.println("Received message " + message + ". Received " + messageCount + " messages so far. In queue size " + queue.size()); @@ -116,7 +116,7 @@ public class DummyReceiver implements MessageHandler { } } - String getParam(List<String> args, String arg) throws IllegalArgumentException { + private String getParam(List<String> args, String arg) throws IllegalArgumentException { try { return args.remove(0); } catch (Exception e) { @@ -138,7 +138,7 @@ public class DummyReceiver implements MessageHandler { " --verbose If set, dump the contents of certain messages to stdout"); } - boolean parseArgs(List<String> args) { + private boolean parseArgs(List<String> args) { try { while (!args.isEmpty()) { String arg = args.remove(0); @@ -174,16 +174,11 @@ public class DummyReceiver implements MessageHandler { } } - public static void main(String[] args) { LogSetup.initVespaLogging("dummyreceiver"); DummyReceiver rcv = new DummyReceiver(); - List<String> l = new LinkedList<>(); - for (String arg : args) { - l.add(arg); - } - if (!rcv.parseArgs(l) && !rcv.helpOption) { + if (!rcv.parseArgs(Arrays.asList(args)) && !rcv.helpOption) { System.exit(1); } if (rcv.helpOption) { |