summaryrefslogtreecommitdiffstats
path: root/vespaclient-java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-13 17:40:49 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-13 17:40:49 +0200
commitdf13ce307c399c649ae2fb2730557882380998c2 (patch)
treec20ce83733d8c911c1c78965dd9f36d61106901b /vespaclient-java
parent377749b786f6a24a67b4624d8114878117a4f2ee (diff)
Use atomic and synchronized sections to do proper accounting of replies. And some geenral code health.
Diffstat (limited to 'vespaclient-java')
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java53
1 files changed, 24 insertions, 29 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
index f8b785fd4e3..dd509d9e23a 100755
--- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
+++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
@@ -20,39 +20,39 @@ import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import java.util.LinkedList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import static java.lang.System.out;
public class DummyReceiver implements MessageHandler {
- String name = null;
- DestinationSession session;
- MessageBusDocumentAccess da;
- long sleepTime = 0;
- long messageCount = 0;
- int maxPendingCount = 0;
- long silentNum = 0;
- boolean instant = false;
- ThreadPoolExecutor executor = null;
- int threads = 10;
- long maxQueueTime = -1;
- BlockingQueue<Runnable> queue;
- boolean verbose = false;
+ private String name = null;
+ private DestinationSession session;
+ private MessageBusDocumentAccess da;
+ private long sleepTime = 0;
+ private AtomicLong messageCount = new AtomicLong(0);
+ private long silentNum = 0;
+ private boolean instant = false;
+ private ThreadPoolExecutor executor = null;
+ private int threads = 10;
+ private long maxQueueTime = -1;
+ private BlockingQueue<Runnable> queue;
+ private boolean verbose = false;
private boolean helpOption = false;
- DummyReceiver() {
+ private DummyReceiver() {
}
- public class Task implements Runnable {
+ class Task implements Runnable {
Reply reply;
- public Task(Reply reply) {
+ Task(Reply reply) {
this.reply = reply;
}
@@ -68,21 +68,21 @@ public class DummyReceiver implements MessageHandler {
}
}
- public void init() {
+ private void init() {
MessageBusParams params = new MessageBusParams(new LoadTypeSet());
params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)));
params.setDocumentManagerConfigId("client");
- params.getMessageBusParams().setMaxPendingCount(maxPendingCount);
+ params.getMessageBusParams().setMaxPendingCount(0);
params.getMessageBusParams().setMaxPendingSize(0);
da = new MessageBusDocumentAccess(params);
queue = (maxQueueTime < 0) ? new LinkedBlockingDeque<>() : new ThroughputLimitQueue<>(maxQueueTime);
session = da.getMessageBus().createDestinationSession("default", true, this);
executor = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue, new DaemonThreadFactory());
- System.out.println("Registered listener at " + name + "/default with " + maxPendingCount + " max pending and sleep time of " + sleepTime);
+ System.out.println("Registered listener at " + name + "/default with 0 max pending and sleep time of " + sleepTime);
}
public void handleMessage(Message message) {
- messageCount++;
+ long messageCount = this.messageCount.incrementAndGet();
if ( silentNum == 0 ) {
System.out.println("Received message " + message + ". Received " + messageCount + " messages so far. In queue size " + queue.size());
@@ -116,7 +116,7 @@ public class DummyReceiver implements MessageHandler {
}
}
- String getParam(List<String> args, String arg) throws IllegalArgumentException {
+ private String getParam(List<String> args, String arg) throws IllegalArgumentException {
try {
return args.remove(0);
} catch (Exception e) {
@@ -138,7 +138,7 @@ public class DummyReceiver implements MessageHandler {
" --verbose If set, dump the contents of certain messages to stdout");
}
- boolean parseArgs(List<String> args) {
+ private boolean parseArgs(List<String> args) {
try {
while (!args.isEmpty()) {
String arg = args.remove(0);
@@ -174,16 +174,11 @@ public class DummyReceiver implements MessageHandler {
}
}
-
public static void main(String[] args) {
LogSetup.initVespaLogging("dummyreceiver");
DummyReceiver rcv = new DummyReceiver();
- List<String> l = new LinkedList<>();
- for (String arg : args) {
- l.add(arg);
- }
- if (!rcv.parseArgs(l) && !rcv.helpOption) {
+ if (!rcv.parseArgs(Arrays.asList(args)) && !rcv.helpOption) {
System.exit(1);
}
if (rcv.helpOption) {