aboutsummaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-25 19:57:57 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-25 19:57:57 +0200
commit907158959c159904c3f4d20274c565806746ac15 (patch)
treef4db3e3d2fc81cf061cb1811b5bc3af4ca91695f /vespa_feed_perf
parent249fe76c9437d0f1a033294df98d8d8101baef2c (diff)
Refactor to allow for lazy decode.
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java33
1 files changed, 17 insertions, 16 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index 1fdbd2db9c0..9e32033634a 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -31,6 +31,7 @@ import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.vespaxmlparser.FeedReader;
+import com.yahoo.vespaxmlparser.FeedOperation;
import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
import net.jpountz.xxhash.XXHashFactory;
@@ -75,7 +76,7 @@ public class SimpleFeeder implements ReplyHandler {
}
private interface Destination {
- void send(VespaXMLFeedReader.Operation op);
+ void send(FeedOperation op);
void close() throws Exception;
}
@@ -90,7 +91,7 @@ public class SimpleFeeder implements ReplyHandler {
this.session = session;
this.failure = failure;
}
- public void send(VespaXMLFeedReader.Operation op) {
+ public void send(FeedOperation op) {
Message msg = newMessage(op);
if (msg == null) {
err.println("ignoring operation; " + op.getType());
@@ -128,8 +129,8 @@ public class SimpleFeeder implements ReplyHandler {
failure.set(e);
}
}
- public void send(VespaXMLFeedReader.Operation op) {
- if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) {
+ public void send(FeedOperation op) {
+ if (op.getType() == FeedOperation.Type.DOCUMENT) {
if (!isFirst) {
try {
outputStream.write(',');
@@ -173,16 +174,16 @@ public class SimpleFeeder implements ReplyHandler {
failure.set(e);
}
}
- public void send(VespaXMLFeedReader.Operation op) {
+ public void send(FeedOperation op) {
DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer);
int type = NONE;
- if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) {
+ if (op.getType() == FeedOperation.Type.DOCUMENT) {
writer.write(op.getDocument());
type = DOCUMENT;
- } else if (op.getType() == VespaXMLFeedReader.OperationType.UPDATE) {
+ } else if (op.getType() == FeedOperation.Type.UPDATE) {
writer.write(op.getDocumentUpdate());
type = UPDATE;
- } else if (op.getType() == VespaXMLFeedReader.OperationType.REMOVE) {
+ } else if (op.getType() == FeedOperation.Type.REMOVE) {
writer.write(op.getRemove());
type = REMOVE;
}
@@ -224,11 +225,11 @@ public class SimpleFeeder implements ReplyHandler {
}
}
@Override
- public void read(VespaXMLFeedReader.Operation operation) throws Exception {
+ public FeedOperation read() throws Exception {
+ VespaXMLFeedReader.Operation operation = new VespaXMLFeedReader.Operation();
int read = in.read(prefix);
if (read != prefix.length) {
- operation.setInvalid();
- return;
+ return operation;
}
ByteBuffer header = ByteBuffer.wrap(prefix);
int sz = header.getInt();
@@ -253,6 +254,7 @@ public class SimpleFeeder implements ReplyHandler {
} else {
throw new IllegalArgumentException("Unknown operation " + type);
}
+ return operation;
}
}
@@ -274,7 +276,7 @@ public class SimpleFeeder implements ReplyHandler {
: new MbusDestination(session, params.getRoute(), failure, params.getStdErr());
}
- private void sendOperation(VespaXMLFeedReader.Operation op) {
+ private void sendOperation(FeedOperation op) {
destination.send(op);
}
@@ -308,9 +310,8 @@ public class SimpleFeeder implements ReplyHandler {
printHeader();
long numMessages = 0;
while (failure.get() == null) {
- VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
- reader.read(op);
- if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) {
+ FeedOperation op = reader.read();
+ if (op.getType() == FeedOperation.Type.INVALID) {
break;
}
if (executor != null) {
@@ -335,7 +336,7 @@ public class SimpleFeeder implements ReplyHandler {
mbus.destroy();
}
- private static Message newMessage(VespaXMLFeedReader.Operation op) {
+ private static Message newMessage(FeedOperation op) {
switch (op.getType()) {
case DOCUMENT: {
PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument()));