diff options
Diffstat (limited to 'vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java')
-rw-r--r-- | vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java | 49 |
1 files changed, 36 insertions, 13 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index e10dce31adf..e55cd27f7da 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.feed.perf; +import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; @@ -23,6 +24,9 @@ import com.yahoo.vespaxmlparser.VespaXMLFeedReader; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -42,13 +46,14 @@ public class SimpleFeeder implements ReplyHandler { private final Route route; private final SourceSession session; private final long startTime = System.currentTimeMillis(); - private AtomicReference<Throwable> failure = new AtomicReference<>(null); - private AtomicLong numReplies = new AtomicLong(0); + private final AtomicReference<Throwable> failure = new AtomicReference<>(null); + private final AtomicLong numReplies = new AtomicLong(0); private long maxLatency = Long.MIN_VALUE; private long minLatency = Long.MAX_VALUE; private long nextHeader = startTime + HEADER_INTERVAL; private long nextReport = startTime + REPORT_INTERVAL; private long sumLatency = 0; + private final int numThreads; public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); @@ -59,31 +64,49 @@ public class SimpleFeeder implements ReplyHandler { this.out = params.getStdOut(); this.err = params.getStdErr(); this.route = params.getRoute(); + this.numThreads = params.getNumDispatchThreads(); this.mbus = newMessageBus(docTypeMgr, params.getConfigId()); this.session = newSession(mbus, this, params.isSerialTransferEnabled()); this.docTypeMgr.configure(params.getConfigId()); } + private void sendOperation(VespaXMLFeedReader.Operation op) { + Message msg = newMessage(op); + if (msg == null) { + err.println("ignoring operation; " + op.getType()); + return; + } + msg.setContext(System.currentTimeMillis()); + msg.setRoute(route); + try { + Error err = session.sendBlocking(msg).getError(); + if (err != null) { + failure.set(new IOException(err.toString())); + } + } catch (InterruptedException e) {} + } + SimpleFeeder run() throws Throwable { + ExecutorService executor = (numThreads > 1) + ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), + new ThreadPoolExecutor.CallerRunsPolicy()) + : null; VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr); - VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); + printHeader(); long numMessages = 0; while (failure.get() == null) { + VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); reader.read(op); if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { break; } - Message msg = newMessage(op); - if (msg == null) { - err.println("ignoring operation; " + op.getType()); - continue; // ignore - } - msg.setContext(System.currentTimeMillis()); - msg.setRoute(route); - Error err = session.sendBlocking(msg).getError(); - if (err != null) { - throw new IOException(err.toString()); + if (executor != null) { + executor.execute(() -> sendOperation(op)); + } else { + sendOperation(op); } ++numMessages; } |