summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--jrt/src/com/yahoo/jrt/Connection.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java7
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java1
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java4
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java40
-rwxr-xr-xvespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java53
6 files changed, 56 insertions, 51 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connection.java b/jrt/src/com/yahoo/jrt/Connection.java
index 36e1f1c8a8e..d4938f8ecbb 100644
--- a/jrt/src/com/yahoo/jrt/Connection.java
+++ b/jrt/src/com/yahoo/jrt/Connection.java
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
-import com.yahoo.jrt.CryptoSocket.HandshakeResult;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 0f0b704bba7..393cf420ae1 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -7,7 +7,12 @@ import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.metrics.MessageBusMetricSet;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkOwner;
-import com.yahoo.messagebus.routing.*;
+import com.yahoo.messagebus.routing.Resender;
+import com.yahoo.messagebus.routing.RetryPolicy;
+import com.yahoo.messagebus.routing.RoutingPolicy;
+import com.yahoo.messagebus.routing.RoutingSpec;
+import com.yahoo.messagebus.routing.RoutingTable;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
import com.yahoo.text.Utf8Array;
import com.yahoo.text.Utf8String;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index ab741b36a05..fdd72c12532 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -17,7 +17,6 @@ import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.Register;
-import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java
index dbdb6da6477..749ba4f4451 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java
@@ -17,7 +17,7 @@ public interface RPCSendAdapter {
*
* @param net The network to attach to.
*/
- public void attach(RPCNetwork net);
+ void attach(RPCNetwork net);
/**
* Performs the actual sending to the given recipient.
@@ -27,5 +27,5 @@ public interface RPCSendAdapter {
* @param payload The already serialized payload of the message to send.
* @param timeRemaining The time remaining until the message expires.
*/
- public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining);
+ void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining);
}
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index e5b243c118a..e10dce31adf 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -3,7 +3,10 @@ package com.yahoo.vespa.feed.perf;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.documentapi.messagebus.protocol.*;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBusParams;
@@ -21,6 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* @author Simon Thoresen Hult
@@ -37,20 +42,19 @@ public class SimpleFeeder implements ReplyHandler {
private final Route route;
private final SourceSession session;
private final long startTime = System.currentTimeMillis();
- private volatile Throwable failure;
- private volatile long numReplies = 0;
+ private AtomicReference<Throwable> failure = new AtomicReference<>(null);
+ private AtomicLong numReplies = new AtomicLong(0);
private long maxLatency = Long.MIN_VALUE;
private long minLatency = Long.MAX_VALUE;
private long nextHeader = startTime + HEADER_INTERVAL;
private long nextReport = startTime + REPORT_INTERVAL;
- private long numMessages = 0;
private long sumLatency = 0;
public static void main(String[] args) throws Throwable {
new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
}
- public SimpleFeeder(FeederParams params) {
+ SimpleFeeder(FeederParams params) {
this.in = params.getStdIn();
this.out = params.getStdOut();
this.err = params.getStdErr();
@@ -60,11 +64,12 @@ public class SimpleFeeder implements ReplyHandler {
this.docTypeMgr.configure(params.getConfigId());
}
- public SimpleFeeder run() throws Throwable {
+ SimpleFeeder run() throws Throwable {
VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr);
VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
printHeader();
- while (failure == null) {
+ long numMessages = 0;
+ while (failure.get() == null) {
reader.read(op);
if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) {
break;
@@ -82,17 +87,17 @@ public class SimpleFeeder implements ReplyHandler {
}
++numMessages;
}
- while (failure == null && numReplies < numMessages) {
+ while (failure.get() == null && numReplies.get() < numMessages) {
Thread.sleep(100);
}
- if (failure != null) {
- throw failure;
+ if (failure.get() != null) {
+ throw failure.get();
}
printReport();
return this;
}
- public void close() {
+ void close() {
session.destroy();
mbus.destroy();
}
@@ -121,19 +126,22 @@ public class SimpleFeeder implements ReplyHandler {
@Override
public void handleReply(Reply reply) {
- if (failure != null) {
+ if (failure.get() != null) {
return;
}
if (reply.hasErrors()) {
- failure = new IOException(formatErrors(reply));
+ failure.compareAndSet(null, new IOException(formatErrors(reply)));
return;
}
long now = System.currentTimeMillis();
- long latency = now - (long)reply.getContext();
+ long latency = now - (long) reply.getContext();
+ numReplies.incrementAndGet();
+ accumulateReplies(now, latency);
+ }
+ private synchronized void accumulateReplies(long now, long latency) {
minLatency = Math.min(minLatency, latency);
maxLatency = Math.max(maxLatency, latency);
sumLatency += latency;
- ++numReplies;
if (now > nextHeader) {
printHeader();
nextHeader += HEADER_INTERVAL;
@@ -150,7 +158,7 @@ public class SimpleFeeder implements ReplyHandler {
private void printReport() {
out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime,
- numReplies, minLatency, sumLatency / numReplies, maxLatency);
+ numReplies.get(), minLatency, sumLatency / numReplies.get(), maxLatency);
}
private static String formatErrors(Reply reply) {
diff --git a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
index f8b785fd4e3..dd509d9e23a 100755
--- a/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
+++ b/vespaclient-java/src/main/java/com/yahoo/dummyreceiver/DummyReceiver.java
@@ -20,39 +20,39 @@ import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import java.util.LinkedList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import static java.lang.System.out;
public class DummyReceiver implements MessageHandler {
- String name = null;
- DestinationSession session;
- MessageBusDocumentAccess da;
- long sleepTime = 0;
- long messageCount = 0;
- int maxPendingCount = 0;
- long silentNum = 0;
- boolean instant = false;
- ThreadPoolExecutor executor = null;
- int threads = 10;
- long maxQueueTime = -1;
- BlockingQueue<Runnable> queue;
- boolean verbose = false;
+ private String name = null;
+ private DestinationSession session;
+ private MessageBusDocumentAccess da;
+ private long sleepTime = 0;
+ private AtomicLong messageCount = new AtomicLong(0);
+ private long silentNum = 0;
+ private boolean instant = false;
+ private ThreadPoolExecutor executor = null;
+ private int threads = 10;
+ private long maxQueueTime = -1;
+ private BlockingQueue<Runnable> queue;
+ private boolean verbose = false;
private boolean helpOption = false;
- DummyReceiver() {
+ private DummyReceiver() {
}
- public class Task implements Runnable {
+ class Task implements Runnable {
Reply reply;
- public Task(Reply reply) {
+ Task(Reply reply) {
this.reply = reply;
}
@@ -68,21 +68,21 @@ public class DummyReceiver implements MessageHandler {
}
}
- public void init() {
+ private void init() {
MessageBusParams params = new MessageBusParams(new LoadTypeSet());
params.setRPCNetworkParams(new RPCNetworkParams().setIdentity(new Identity(name)));
params.setDocumentManagerConfigId("client");
- params.getMessageBusParams().setMaxPendingCount(maxPendingCount);
+ params.getMessageBusParams().setMaxPendingCount(0);
params.getMessageBusParams().setMaxPendingSize(0);
da = new MessageBusDocumentAccess(params);
queue = (maxQueueTime < 0) ? new LinkedBlockingDeque<>() : new ThroughputLimitQueue<>(maxQueueTime);
session = da.getMessageBus().createDestinationSession("default", true, this);
executor = new ThreadPoolExecutor(threads, threads, 5, TimeUnit.SECONDS, queue, new DaemonThreadFactory());
- System.out.println("Registered listener at " + name + "/default with " + maxPendingCount + " max pending and sleep time of " + sleepTime);
+ System.out.println("Registered listener at " + name + "/default with 0 max pending and sleep time of " + sleepTime);
}
public void handleMessage(Message message) {
- messageCount++;
+ long messageCount = this.messageCount.incrementAndGet();
if ( silentNum == 0 ) {
System.out.println("Received message " + message + ". Received " + messageCount + " messages so far. In queue size " + queue.size());
@@ -116,7 +116,7 @@ public class DummyReceiver implements MessageHandler {
}
}
- String getParam(List<String> args, String arg) throws IllegalArgumentException {
+ private String getParam(List<String> args, String arg) throws IllegalArgumentException {
try {
return args.remove(0);
} catch (Exception e) {
@@ -138,7 +138,7 @@ public class DummyReceiver implements MessageHandler {
" --verbose If set, dump the contents of certain messages to stdout");
}
- boolean parseArgs(List<String> args) {
+ private boolean parseArgs(List<String> args) {
try {
while (!args.isEmpty()) {
String arg = args.remove(0);
@@ -174,16 +174,11 @@ public class DummyReceiver implements MessageHandler {
}
}
-
public static void main(String[] args) {
LogSetup.initVespaLogging("dummyreceiver");
DummyReceiver rcv = new DummyReceiver();
- List<String> l = new LinkedList<>();
- for (String arg : args) {
- l.add(arg);
- }
- if (!rcv.parseArgs(l) && !rcv.helpOption) {
+ if (!rcv.parseArgs(Arrays.asList(args)) && !rcv.helpOption) {
System.exit(1);
}
if (rcv.helpOption) {