diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-13 17:40:49 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-13 17:40:49 +0200 |
commit | df13ce307c399c649ae2fb2730557882380998c2 (patch) | |
tree | c20ce83733d8c911c1c78965dd9f36d61106901b /vespa_feed_perf | |
parent | 377749b786f6a24a67b4624d8114878117a4f2ee (diff) |
Use atomic and synchronized sections to do proper accounting of replies. And some geenral code health.
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r-- | vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java | 40 |
1 files changed, 24 insertions, 16 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index e5b243c118a..e10dce31adf 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -3,7 +3,10 @@ package com.yahoo.vespa.feed.perf; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.messagebus.protocol.*; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBusParams; @@ -21,6 +24,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * @author Simon Thoresen Hult @@ -37,20 +42,19 @@ public class SimpleFeeder implements ReplyHandler { private final Route route; private final SourceSession session; private final long startTime = System.currentTimeMillis(); - private volatile Throwable failure; - private volatile long numReplies = 0; + private AtomicReference<Throwable> failure = new AtomicReference<>(null); + private AtomicLong numReplies = new AtomicLong(0); private long maxLatency = Long.MIN_VALUE; private long minLatency = Long.MAX_VALUE; private long nextHeader = startTime + HEADER_INTERVAL; private long nextReport = startTime + REPORT_INTERVAL; - private long numMessages = 0; private long sumLatency = 0; public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); } - public SimpleFeeder(FeederParams params) { + SimpleFeeder(FeederParams params) { this.in = params.getStdIn(); this.out = params.getStdOut(); this.err = params.getStdErr(); @@ -60,11 +64,12 @@ public class SimpleFeeder implements ReplyHandler { this.docTypeMgr.configure(params.getConfigId()); } - public SimpleFeeder run() throws Throwable { + SimpleFeeder run() throws Throwable { VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr); VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); printHeader(); - while (failure == null) { + long numMessages = 0; + while (failure.get() == null) { reader.read(op); if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { break; @@ -82,17 +87,17 @@ public class SimpleFeeder implements ReplyHandler { } ++numMessages; } - while (failure == null && numReplies < numMessages) { + while (failure.get() == null && numReplies.get() < numMessages) { Thread.sleep(100); } - if (failure != null) { - throw failure; + if (failure.get() != null) { + throw failure.get(); } printReport(); return this; } - public void close() { + void close() { session.destroy(); mbus.destroy(); } @@ -121,19 +126,22 @@ public class SimpleFeeder implements ReplyHandler { @Override public void handleReply(Reply reply) { - if (failure != null) { + if (failure.get() != null) { return; } if (reply.hasErrors()) { - failure = new IOException(formatErrors(reply)); + failure.compareAndSet(null, new IOException(formatErrors(reply))); return; } long now = System.currentTimeMillis(); - long latency = now - (long)reply.getContext(); + long latency = now - (long) reply.getContext(); + numReplies.incrementAndGet(); + accumulateReplies(now, latency); + } + private synchronized void accumulateReplies(long now, long latency) { minLatency = Math.min(minLatency, latency); maxLatency = Math.max(maxLatency, latency); sumLatency += latency; - ++numReplies; if (now > nextHeader) { printHeader(); nextHeader += HEADER_INTERVAL; @@ -150,7 +158,7 @@ public class SimpleFeeder implements ReplyHandler { private void printReport() { out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime, - numReplies, minLatency, sumLatency / numReplies, maxLatency); + numReplies.get(), minLatency, sumLatency / numReplies.get(), maxLatency); } private static String formatErrors(Reply reply) { |