summaryrefslogtreecommitdiffstats
path: root/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java')
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java196
1 files changed, 196 insertions, 0 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
new file mode 100755
index 00000000000..7b104455f15
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
@@ -0,0 +1,196 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.dummyreceiver;
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.documentapi.ThroughputLimitQueue;
+import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
+import com.yahoo.documentapi.messagebus.MessageBusParams;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.log.LogSetup;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.System.out;
+
+public class DummyReceiver implements MessageHandler {
+ String name = null;
+ DestinationSession session;
+ MessageBusDocumentAccess da;
+ long sleepTime = 0;
+ long messageCount = 0;
+ int maxPendingCount = 0;
+ long silentNum = 0;
+ boolean instant = false;
+ ThreadPoolExecutor executor = null;
+ int threads = 10;
+ long maxQueueTime = -1;
+ BlockingQueue<Runnable> queue;
+ boolean verbose = false;
+
+ DummyReceiver() {
+ }
+
+ public class Task implements Runnable {
+ Reply reply;
+
+ public Task(Reply reply) {
+ this.reply = reply;
+ }
+
+ public void run() {
+ if (sleepTime > 0) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ session.reply(reply);
+ }
+ }
+
+ public void init() {
+ MessageBusParams params = new MessageBusParams(new LoadTypeSet());
+ params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)));
+ params.setDocumentManagerConfigId("client");
+ params.getMessageBusParams().setMaxPendingCount(maxPendingCount);
+ params.getMessageBusParams().setMaxPendingSize(0);
+ da = new MessageBusDocumentAccess(params);
+ queue = (maxQueueTime < 0) ? new LinkedBlockingDeque<>() : new ThroughputLimitQueue<>(maxQueueTime);
+ session = da.getMessageBus().createDestinationSession("default", true, this);
+ executor = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue, new DaemonThreadFactory());
+ System.out.println("Registered listener at " + name + "/default with " + maxPendingCount + " max pending and sleep time of " + sleepTime);
+ }
+
+ public void handleMessage(Message message) {
+ messageCount++;
+ if ( silentNum == 0 ) {
+ System.out.println("Received message " + message + ". Received " + messageCount + " messages so far. In queue size " + queue.size());
+
+ if (verbose) {
+ if (message instanceof PutDocumentMessage) {
+ System.out.println(" Document:\n" + ((PutDocumentMessage) message).getDocumentPut().getDocument().toXML(" "));
+ } else if (message instanceof RemoveDocumentMessage) {
+ System.out.println(" Document id: " + ((RemoveDocumentMessage) message).getDocumentId());
+ } else if (message instanceof UpdateDocumentMessage) {
+ System.out.println(" Update:\n " + ((UpdateDocumentMessage) message).getDocumentUpdate().toString());
+ }
+ }
+ } else {
+ if ((messageCount % silentNum) == 0) {
+ System.out.println("Received " + messageCount + " messages so far. In queue size " + queue.size());
+ }
+ }
+
+ EmptyReply reply = new EmptyReply();
+ message.swapState(reply);
+
+ if ( ! instant ) {
+ try {
+ executor.execute(new Task(reply));
+ } catch (RejectedExecutionException e) {
+ reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session " + name + "/default is busy"));
+ session.reply(reply);
+ }
+ } else {
+ session.reply(reply);
+ }
+ }
+
+ String getParam(List<String> args, String arg) throws IllegalArgumentException {
+ try {
+ return args.remove(0);
+ } catch (Exception e) {
+ System.err.println("--" + arg + " requires an argument");
+ throw new IllegalArgumentException(arg);
+ }
+ }
+
+ public void help() {
+ out.println("Simple receiver for messagebus messages. Prints the messages received to stdout.\n" +
+ "\n" +
+ "The options are:\n" +
+ " --instant Reply in message thread." +
+ " --name arg Slobrok name to register\n" +
+ " --maxqueuetime arg Adjust the in queue size to have a maximum queue wait period of this many ms (default -1 = unlimited)\n" +
+ " --silent #nummsg Do not dump anything, but progress every #nummsg\n" +
+ " --sleeptime arg The number of milliseconds to sleep per message, to simulate processing time\n" +
+ " --threads arg The number of threads to process the incoming data\n" +
+ " --verbose If set, dump the contents of certain messages to stdout");
+ }
+
+ boolean parseArgs(List<String> args) {
+ try {
+ while (!args.isEmpty()) {
+ String arg = args.remove(0);
+
+ if (arg.equals("-h") || arg.equals("--help")) {
+ help();
+ return false;
+ } else if ("--name".equals(arg)) {
+ name = getParam(args, arg);
+ } else if ("--sleeptime".equals(arg)) {
+ sleepTime = Long.parseLong(getParam(args, arg));
+ } else if ("--instant".equals(arg)) {
+ instant = true;
+ } else if ("--silent".equals(arg)) {
+ silentNum = Long.parseLong(getParam(args, arg));
+ } else if ("--maxqueuetime".equals(arg)) {
+ maxQueueTime = Long.parseLong(getParam(args, arg));
+ } else if ("--threads".equals(arg)) {
+ threads = Integer.parseInt(getParam(args, arg));
+ } else if ("--verbose".equals(arg)) {
+ verbose = true;
+ } else {
+ help();
+ return false;
+ }
+ }
+
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
+
+ public static void main(String[] args) {
+ LogSetup.initVespaLogging("dummyreceiver");
+ DummyReceiver rcv = new DummyReceiver();
+
+ List<String> l = new LinkedList<>();
+ for (String arg : args) {
+ l.add(arg);
+ }
+ if (!rcv.parseArgs(l)) {
+ System.exit(1);
+ }
+
+ rcv.init();
+ while (true) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}