From 907158959c159904c3f4d20274c565806746ac15 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 25 Apr 2019 19:57:57 +0200 Subject: Refactor to allow for lazy decode. --- .../com/yahoo/vespa/feed/perf/SimpleFeeder.java | 33 +++++++++++----------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'vespa_feed_perf/src') diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 1fdbd2db9c0..9e32033634a 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -31,6 +31,7 @@ import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.routing.Route; import com.yahoo.vespaxmlparser.FeedReader; +import com.yahoo.vespaxmlparser.FeedOperation; import com.yahoo.vespaxmlparser.VespaXMLFeedReader; import net.jpountz.xxhash.XXHashFactory; @@ -75,7 +76,7 @@ public class SimpleFeeder implements ReplyHandler { } private interface Destination { - void send(VespaXMLFeedReader.Operation op); + void send(FeedOperation op); void close() throws Exception; } @@ -90,7 +91,7 @@ public class SimpleFeeder implements ReplyHandler { this.session = session; this.failure = failure; } - public void send(VespaXMLFeedReader.Operation op) { + public void send(FeedOperation op) { Message msg = newMessage(op); if (msg == null) { err.println("ignoring operation; " + op.getType()); @@ -128,8 +129,8 @@ public class SimpleFeeder implements ReplyHandler { failure.set(e); } } - public void send(VespaXMLFeedReader.Operation op) { - if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) { + public void send(FeedOperation op) { + if (op.getType() == FeedOperation.Type.DOCUMENT) { if (!isFirst) { try { outputStream.write(','); @@ -173,16 +174,16 @@ public class SimpleFeeder implements ReplyHandler { failure.set(e); } } - public void send(VespaXMLFeedReader.Operation op) { + public void send(FeedOperation op) { DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer); int type = NONE; - if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) { + if (op.getType() == FeedOperation.Type.DOCUMENT) { writer.write(op.getDocument()); type = DOCUMENT; - } else if (op.getType() == VespaXMLFeedReader.OperationType.UPDATE) { + } else if (op.getType() == FeedOperation.Type.UPDATE) { writer.write(op.getDocumentUpdate()); type = UPDATE; - } else if (op.getType() == VespaXMLFeedReader.OperationType.REMOVE) { + } else if (op.getType() == FeedOperation.Type.REMOVE) { writer.write(op.getRemove()); type = REMOVE; } @@ -224,11 +225,11 @@ public class SimpleFeeder implements ReplyHandler { } } @Override - public void read(VespaXMLFeedReader.Operation operation) throws Exception { + public FeedOperation read() throws Exception { + VespaXMLFeedReader.Operation operation = new VespaXMLFeedReader.Operation(); int read = in.read(prefix); if (read != prefix.length) { - operation.setInvalid(); - return; + return operation; } ByteBuffer header = ByteBuffer.wrap(prefix); int sz = header.getInt(); @@ -253,6 +254,7 @@ public class SimpleFeeder implements ReplyHandler { } else { throw new IllegalArgumentException("Unknown operation " + type); } + return operation; } } @@ -274,7 +276,7 @@ public class SimpleFeeder implements ReplyHandler { : new MbusDestination(session, params.getRoute(), failure, params.getStdErr()); } - private void sendOperation(VespaXMLFeedReader.Operation op) { + private void sendOperation(FeedOperation op) { destination.send(op); } @@ -308,9 +310,8 @@ public class SimpleFeeder implements ReplyHandler { printHeader(); long numMessages = 0; while (failure.get() == null) { - VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); - reader.read(op); - if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { + FeedOperation op = reader.read(); + if (op.getType() == FeedOperation.Type.INVALID) { break; } if (executor != null) { @@ -335,7 +336,7 @@ public class SimpleFeeder implements ReplyHandler { mbus.destroy(); } - private static Message newMessage(VespaXMLFeedReader.Operation op) { + private static Message newMessage(FeedOperation op) { switch (op.getType()) { case DOCUMENT: { PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument())); -- cgit v1.2.3