diff options
Diffstat (limited to 'vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java')
-rw-r--r-- | vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java | 47 |
1 files changed, 33 insertions, 14 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index b4ca98f316f..386468e85e8 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.feed.perf; import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.config.subscription.ConfigSubscriber; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; @@ -66,7 +65,6 @@ import java.util.stream.Stream; public class SimpleFeeder implements ReplyHandler { private final DocumentTypeManager docTypeMgr = new DocumentTypeManager(); - private final ConfigSubscriber documentTypeConfigSubscriber; private final List<InputStream> inputStreams; private final PrintStream out; private final RPCMessageBus mbus; @@ -185,21 +183,40 @@ public class SimpleFeeder implements ReplyHandler { } } public void send(FeedOperation op) { - if (op.getType() == FeedOperation.Type.DOCUMENT) { - if (!isFirst) { - try { - outputStream.write(','); - outputStream.write('\n'); - } catch (IOException e) { - failure.set(e); - } - } else { - isFirst = false; + switch (op.getType()) { + case DOCUMENT -> { + addCommaAndNewline(); + writer.write(op.getDocumentPut().getDocument()); } - writer.write(op.getDocumentPut().getDocument()); + case REMOVE -> { + addCommaAndNewline(); + writer.write(op.getDocumentRemove()); + } + case UPDATE -> { + addCommaAndNewline(); + writer.write(op.getDocumentUpdate()); + } + default -> { /* TODO: No more operations supported yet */ } } numReplies.incrementAndGet(); } + + private void addCommaAndNewline() { + try { + if (isFirst) { + outputStream.write(' '); + outputStream.write(' '); + isFirst = false; + } else { + outputStream.write(','); + outputStream.write('\n'); + outputStream.write(' '); + } + } catch (IOException e) { + failure.set(e); + } + } + public void close() throws Exception { outputStream.write('\n'); outputStream.write(']'); @@ -378,7 +395,7 @@ public class SimpleFeeder implements ReplyHandler { numMessagesToSend = params.getNumMessagesToSend(); mbus = newMessageBus(docTypeMgr, params); session = newSession(mbus, this, params); - documentTypeConfigSubscriber = DocumentTypeManagerConfigurer.configure(docTypeMgr, params.getConfigId()); + DocumentTypeManagerConfigurer.configure(docTypeMgr, params.getConfigId()); benchmarkMode = params.isBenchmarkMode(); destination = (params.getDumpStream() != null) ? createDumper(params) @@ -469,6 +486,7 @@ public class SimpleFeeder implements ReplyHandler { numReplies.incrementAndGet(); accumulateReplies(now, latency); } + private synchronized void accumulateReplies(long now, long latency) { minLatency = Math.min(minLatency, latency); maxLatency = Math.max(maxLatency, latency); @@ -479,6 +497,7 @@ public class SimpleFeeder implements ReplyHandler { nextReport += REPORT_INTERVAL; } } + private static void printHeader(PrintStream out) { out.println("# Time used, num ok, num error, min latency, max latency, average latency"); } |