aboutsummaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-13 17:40:49 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-13 17:40:49 +0200
commitdf13ce307c399c649ae2fb2730557882380998c2 (patch)
treec20ce83733d8c911c1c78965dd9f36d61106901b /vespa_feed_perf
parent377749b786f6a24a67b4624d8114878117a4f2ee (diff)
Use atomic and synchronized sections to do proper accounting of replies. And some geenral code health.
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java40
1 files changed, 24 insertions, 16 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index e5b243c118a..e10dce31adf 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -3,7 +3,10 @@ package com.yahoo.vespa.feed.perf;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.documentapi.messagebus.protocol.*;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBusParams;
@@ -21,6 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* @author Simon Thoresen Hult
@@ -37,20 +42,19 @@ public class SimpleFeeder implements ReplyHandler {
private final Route route;
private final SourceSession session;
private final long startTime = System.currentTimeMillis();
- private volatile Throwable failure;
- private volatile long numReplies = 0;
+ private AtomicReference<Throwable> failure = new AtomicReference<>(null);
+ private AtomicLong numReplies = new AtomicLong(0);
private long maxLatency = Long.MIN_VALUE;
private long minLatency = Long.MAX_VALUE;
private long nextHeader = startTime + HEADER_INTERVAL;
private long nextReport = startTime + REPORT_INTERVAL;
- private long numMessages = 0;
private long sumLatency = 0;
public static void main(String[] args) throws Throwable {
new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
}
- public SimpleFeeder(FeederParams params) {
+ SimpleFeeder(FeederParams params) {
this.in = params.getStdIn();
this.out = params.getStdOut();
this.err = params.getStdErr();
@@ -60,11 +64,12 @@ public class SimpleFeeder implements ReplyHandler {
this.docTypeMgr.configure(params.getConfigId());
}
- public SimpleFeeder run() throws Throwable {
+ SimpleFeeder run() throws Throwable {
VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr);
VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
printHeader();
- while (failure == null) {
+ long numMessages = 0;
+ while (failure.get() == null) {
reader.read(op);
if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) {
break;
@@ -82,17 +87,17 @@ public class SimpleFeeder implements ReplyHandler {
}
++numMessages;
}
- while (failure == null && numReplies < numMessages) {
+ while (failure.get() == null && numReplies.get() < numMessages) {
Thread.sleep(100);
}
- if (failure != null) {
- throw failure;
+ if (failure.get() != null) {
+ throw failure.get();
}
printReport();
return this;
}
- public void close() {
+ void close() {
session.destroy();
mbus.destroy();
}
@@ -121,19 +126,22 @@ public class SimpleFeeder implements ReplyHandler {
@Override
public void handleReply(Reply reply) {
- if (failure != null) {
+ if (failure.get() != null) {
return;
}
if (reply.hasErrors()) {
- failure = new IOException(formatErrors(reply));
+ failure.compareAndSet(null, new IOException(formatErrors(reply)));
return;
}
long now = System.currentTimeMillis();
- long latency = now - (long)reply.getContext();
+ long latency = now - (long) reply.getContext();
+ numReplies.incrementAndGet();
+ accumulateReplies(now, latency);
+ }
+ private synchronized void accumulateReplies(long now, long latency) {
minLatency = Math.min(minLatency, latency);
maxLatency = Math.max(maxLatency, latency);
sumLatency += latency;
- ++numReplies;
if (now > nextHeader) {
printHeader();
nextHeader += HEADER_INTERVAL;
@@ -150,7 +158,7 @@ public class SimpleFeeder implements ReplyHandler {
private void printReport() {
out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime,
- numReplies, minLatency, sumLatency / numReplies, maxLatency);
+ numReplies.get(), minLatency, sumLatency / numReplies.get(), maxLatency);
}
private static String formatErrors(Reply reply) {