summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKristian Aune <kkraune@users.noreply.github.com>2017-03-17 09:45:12 +0100
committerGitHub <noreply@github.com>2017-03-17 09:45:12 +0100
commitb09d7deb64e5a723c5a052dd2b1db225f632405f (patch)
treef9a240cc40b06e0662175a9101a56cf8edd6780b
parent772090db5b9a3bf4dfa34d1385e3c549bee8fa95 (diff)
parent8bbdd803f7bd67d51b8dc7f0abc906f793b42520 (diff)
Merge pull request #2023 from yahoo/lesters/move-vespa-hadoop
Move vespa-hadoop to Vespa repository
-rw-r--r--hadoop/README7
-rw-r--r--hadoop/pom.xml39
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java2
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java78
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java98
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java2
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java52
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java31
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java12
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java8
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java62
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java1
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java1
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java1
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java32
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java58
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java14
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java14
-rw-r--r--hadoop/src/test/pig/feed_multiline_operations.pig14
-rw-r--r--hadoop/src/test/pig/feed_operations_with_json_loader.pig13
-rw-r--r--hadoop/src/test/pig/feed_operations_xml.pig10
-rw-r--r--hadoop/src/test/pig/query_alt_root.pig19
-rw-r--r--hadoop/src/test/resources/operations_data.xml13
-rw-r--r--hadoop/src/test/resources/operations_multiline_data.json93
24 files changed, 600 insertions, 74 deletions
diff --git a/hadoop/README b/hadoop/README
index 66ce7113337..1b567b88c1d 100644
--- a/hadoop/README
+++ b/hadoop/README
@@ -1,7 +1,4 @@
-The grid feeding client.
+The Vespa Hadoop client.
-TODO: the code here is currently not the code that is built and distributed.
-That is in https://git.corp.yahoo.com/vespa/vespa-hadoop. Soon, that code will
-be moved here as it has a close relationship to the Java feed client. Also,
-system tests will be added.
+Contains APIs for feeding and querying Vespa from the grid.
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
index e12b9cbd86a..f23b73abc54 100644
--- a/hadoop/pom.xml
+++ b/hadoop/pom.xml
@@ -15,11 +15,12 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <hadoop.version>2.6.0</hadoop.version>
+ <hadoop.version>2.7.3</hadoop.version>
<pig.version>0.14.0</pig.version>
</properties>
<dependencies>
+ <!-- Hadoop dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@@ -40,6 +41,33 @@
<scope>provided</scope>
</dependency>
+ <!-- These are inherited from parent. Needed for correct versions on Hadoop. -->
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.5.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.4.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.1</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Test dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
@@ -53,6 +81,7 @@
<scope>test</scope>
</dependency>
+ <!-- Vespa feeding dependencies -->
<dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>vespa-http-client</artifactId>
@@ -66,10 +95,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.3</version>
<configuration>
- <source>1.8</source>
- <target>1.8</target>
+ <showDeprecation>true</showDeprecation>
+ <compilerArgs>
+ <arg>-Xlint:all</arg>
+ <arg>-Werror</arg>
+ </compilerArgs>
</configuration>
</plugin>
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
index 5181ef05e2c..720a6adf477 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -14,6 +14,7 @@ import java.util.Properties;
*
* @author lesters
*/
+@SuppressWarnings("rawtypes")
public class VespaOutputFormat extends OutputFormat {
private final Properties configOverride;
@@ -45,7 +46,6 @@ public class VespaOutputFormat extends OutputFormat {
@Override
public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
-
}
}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
index 015d79cd177..8072e99e2ab 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -1,5 +1,23 @@
package com.yahoo.vespa.hadoop.mapreduce;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
@@ -10,14 +28,13 @@ import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedClientFactory;
import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.*;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.StringTokenizer;
-import java.util.logging.Logger;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
/**
* VespaRecordWriter sends the output &lt;key, value&gt; to one or more Vespa
@@ -25,6 +42,7 @@ import java.util.logging.Logger;
*
* @author lesters
*/
+@SuppressWarnings("rawtypes")
public class VespaRecordWriter extends RecordWriter {
private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
@@ -36,7 +54,6 @@ public class VespaRecordWriter extends RecordWriter {
private final VespaConfiguration configuration;
private final int progressInterval;
-
VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
this.counters = counters;
this.configuration = configuration;
@@ -50,13 +67,13 @@ public class VespaRecordWriter extends RecordWriter {
initialize();
}
- // Assumption: json - xml not currently supported
- String json = data.toString().trim();
+ String doc = data.toString().trim();
- // Parse json to find document id - if none found, skip this write
- String docId = findDocId(json);
+ // Parse data to find document id - if none found, skip this write
+ String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
+ : findDocIdFromXml(doc);
if (docId != null && docId.length() >= 0) {
- feedClient.stream(docId, json);
+ feedClient.stream(docId, doc);
counters.incrementDocumentsSent(1);
} else {
counters.incrementDocumentsSkipped(1);
@@ -83,11 +100,20 @@ public class VespaRecordWriter extends RecordWriter {
private void initialize() {
+ if (!configuration.dryrun() && configuration.randomSartupSleepMs() > 0) {
+ int delay = new Random().nextInt(configuration.randomSartupSleepMs());
+ log.info("VespaStorage: Delaying startup by " + delay + " ms");
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e) {}
+ }
+
ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
connParamsBuilder.setDryRun(configuration.dryrun());
connParamsBuilder.setUseCompression(configuration.useCompression());
connParamsBuilder.setEnableV3Protocol(configuration.useV3Protocol());
connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
+ connParamsBuilder.setMaxRetries(configuration.numRetries());
if (configuration.proxyHost() != null) {
connParamsBuilder.setProxyHost(configuration.proxyHost());
}
@@ -98,6 +124,8 @@ public class VespaRecordWriter extends RecordWriter {
FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
feedParamsBuilder.setDataFormat(configuration.dataFormat());
feedParamsBuilder.setRoute(configuration.route());
+ feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
+ feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
SessionParams.Builder sessionParams = new SessionParams.Builder();
sessionParams.setThrottlerMinSize(configuration.throttlerMinSize());
@@ -117,9 +145,29 @@ public class VespaRecordWriter extends RecordWriter {
feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
initialized = true;
+ log.info("VespaStorage configuration:\n" + configuration.toString());
}
-
+ private String findDocIdFromXml(String xml) {
+ try {
+ XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
+ while (eventReader.hasNext()) {
+ XMLEvent event = eventReader.nextEvent();
+ if (event.getEventType() == XMLEvent.START_ELEMENT) {
+ StartElement element = event.asStartElement();
+ String elementName = element.getName().getLocalPart();
+ if (VespaDocumentOperation.Operation.valid(elementName)) {
+ return element.getAttributeByName(QName.valueOf("documentid")).getValue();
+ }
+ }
+ }
+ } catch (XMLStreamException | FactoryConfigurationError e) {
+ // as json dude does
+ return null;
+ }
+ return null;
+ }
+
private String findDocId(String json) throws IOException {
JsonFactory factory = new JsonFactory();
try(JsonParser parser = factory.createParser(json)) {
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
new file mode 100644
index 00000000000..d7bdc592fd5
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
@@ -0,0 +1,98 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+
+/**
+ * Simple JSON reader which splits the input file along JSON object boundaries.
+ *
+ * There are two cases handled here:
+ * 1. Each line contains a JSON object, i.e. { ... }
+ * 2. The file contains an array of objects with arbitrary line breaks, i.e. [ {...}, {...} ]
+ *
+ * Not suitable for cases where you want to extract objects from some other arbitrary structure.
+ *
+ * TODO: Support config which points to a array in the JSON as start point for object extraction,
+ * ala how it is done in VespaHttpClient.parseResultJson, i.e. support rootNode config.
+ *
+ * @author lesters
+ */
+public class VespaSimpleJsonInputFormat extends FileInputFormat<Text, NullWritable> {
+
+ @Override
+ public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ return new VespaJsonRecordReader();
+ }
+
+ public static class VespaJsonRecordReader extends RecordReader<Text, NullWritable> {
+ private long remaining;
+ private JsonParser parser;
+ private Text currentKey;
+ private NullWritable currentValue = NullWritable.get();
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit) split;
+ FSDataInputStream stream = FileSystem.get(context.getConfiguration()).open(fileSplit.getPath());
+ if (fileSplit.getStart() != 0) {
+ stream.seek(fileSplit.getStart());
+ }
+
+ remaining = fileSplit.getLength();
+
+ JsonFactory factory = new JsonFactory();
+ parser = factory.createParser(new BufferedInputStream(stream));
+ parser.setCodec(new ObjectMapper());
+ parser.nextToken();
+ if (parser.currentToken() == JsonToken.START_ARRAY) {
+ parser.nextToken();
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (parser.currentToken() != JsonToken.START_OBJECT) {
+ return true;
+ }
+ currentKey = new Text(parser.readValueAsTree().toString());
+ parser.nextToken();
+ return false;
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+ return currentKey;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return parser.getCurrentLocation().getByteOffset() / remaining;
+ }
+
+ @Override
+ public void close() throws IOException {
+ parser.close();
+ }
+ }
+
+}
+
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
index cd692c69ba7..1d11a3f9fdd 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
@@ -60,7 +60,7 @@ public class TupleTools {
while (m.find()) {
Object value = fields.get(m.group(1));
String replacement = value != null ? value.toString() : m.group(0);
- m.appendReplacement(sb, replacement);
+ m.appendReplacement(sb, Matcher.quoteReplacement(replacement));
}
m.appendTail(sb);
return sb.toString();
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
index f11b09894d0..6e35e5e465a 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -21,6 +21,10 @@ public class VespaConfiguration {
public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size";
public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout";
public static final String ROUTE = "vespa.feed.route";
+ public static final String MAX_SLEEP_TIME_MS = "vespa.feed.max.sleep.time.ms";
+ public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
+ public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
+ public static final String NUM_RETRIES = "vespa.feed.num.retries";
private final Configuration conf;
private final Properties override;
@@ -62,12 +66,12 @@ public class VespaConfiguration {
public boolean useV3Protocol() {
- return getBoolean(V3_PROTOCOL, false);
+ return getBoolean(V3_PROTOCOL, true);
}
public int numConnections() {
- return getInt(CONNECTIONS, 8);
+ return getInt(CONNECTIONS, 2);
}
@@ -80,11 +84,32 @@ public class VespaConfiguration {
return getInt(QUERY_CONNECTION_TIMEOUT, 10000);
}
+
public String route() {
return getString(ROUTE);
}
+ public int maxSleepTimeMs() {
+ return getInt(MAX_SLEEP_TIME_MS, 10000);
+ }
+
+
+ public int maxInFlightRequests() {
+ return getInt(MAX_IN_FLIGHT_REQUESTS, 1000);
+ }
+
+
+ public int randomSartupSleepMs() {
+ return getInt(RANDOM_STARTUP_SLEEP, 30000);
+ }
+
+
+ public int numRetries() {
+ return getInt(NUM_RETRIES, 100);
+ }
+
+
public FeedParams.DataFormat dataFormat() {
String format = getString(DATA_FORMAT);
if ("xml".equalsIgnoreCase(format)) {
@@ -93,6 +118,7 @@ public class VespaConfiguration {
return FeedParams.DataFormat.JSON_UTF8;
}
+
public int progressInterval() {
return getInt(PROGRESS_REPORT, 1000);
}
@@ -136,4 +162,26 @@ public class VespaConfiguration {
return properties;
}
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ENDPOINT + ": " + endpoint() + "\n");
+ sb.append(PROXY_HOST + ": " + proxyHost() + "\n");
+ sb.append(PROXY_PORT + ": " + proxyPort() + "\n");
+ sb.append(DRYRUN + ": " + dryrun() +"\n");
+ sb.append(USE_COMPRESSION + ": " + useCompression() +"\n");
+ sb.append(DATA_FORMAT + ": " + dataFormat() +"\n");
+ sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n");
+ sb.append(V3_PROTOCOL + ": " + useV3Protocol() +"\n");
+ sb.append(CONNECTIONS + ": " + numConnections() +"\n");
+ sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n");
+ sb.append(QUERY_CONNECTION_TIMEOUT + ": " + queryConnectionTimeout() +"\n");
+ sb.append(ROUTE + ": " + route() +"\n");
+ sb.append(MAX_SLEEP_TIME_MS + ": " + maxSleepTimeMs() +"\n");
+ sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n");
+ sb.append(RANDOM_STARTUP_SLEEP + ": " + randomSartupSleepMs() +"\n");
+ sb.append(NUM_RETRIES + ": " + numRetries() +"\n");
+ return sb.toString();
+ }
+
}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
index e63a4044c95..50a089c8b45 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
@@ -24,12 +24,13 @@ public class VespaHttpClient {
}
public VespaHttpClient(VespaConfiguration configuration) {
- httpClient = createClient(configuration);
+ httpClient = createClient(configuration);
}
public String get(String url) throws IOException {
HttpGet httpGet = new HttpGet(url);
HttpResponse httpResponse = httpClient.execute(httpGet);
+
HttpEntity entity = httpResponse.getEntity();
InputStream is = entity.getContent();
@@ -47,16 +48,36 @@ public class VespaHttpClient {
return result;
}
- public JsonNode parseResultJson(String json) throws IOException {
+ public JsonNode parseResultJson(String json, String rootNode) throws IOException {
if (json == null || json.isEmpty()) {
return null;
}
+ if (rootNode == null || rootNode.isEmpty()) {
+ return null;
+ }
+
ObjectMapper m = new ObjectMapper();
JsonNode node = m.readTree(json);
if (node != null) {
- node = node.get("root");
- if (node != null) {
- node = node.get("children");
+ String[] path = rootNode.split("/");
+ for (String p : path) {
+ node = node.get(p);
+
+ if (node == null) {
+ return null;
+ }
+
+ // if node is an array, return the first node that has the correct path
+ if (node.isArray()) {
+ for (int i = 0; i < node.size(); ++i) {
+ JsonNode n = node.get(i);
+ if (n.has(p)) {
+ node = n;
+ break;
+ }
+ }
+ }
+
}
}
return node;
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
index 20e9af222f5..017ffcdd215 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigWarning;
import org.apache.pig.data.DataBag;
@@ -27,6 +28,7 @@ import java.util.*;
public class VespaDocumentOperation extends EvalFunc<String> {
public enum Operation {
+ DOCUMENT,
PUT,
ID,
REMOVE,
@@ -108,7 +110,13 @@ public class VespaDocumentOperation extends EvalFunc<String> {
} catch (Exception e) {
- throw new IOException("Caught exception processing input row ", e);
+ StringBuilder sb = new StringBuilder();
+ sb.append("Caught exception processing input row: \n");
+ sb.append(tuple.toString());
+ sb.append("\nException: ");
+ sb.append(ExceptionUtils.getStackTrace(e));
+ warn(sb.toString(), PigWarning.UDF_WARNING_1);
+ return null;
}
return json;
@@ -123,7 +131,7 @@ public class VespaDocumentOperation extends EvalFunc<String> {
* @param docId Document id
* @param fields Fields to put in document operation
* @return A valid JSON Vespa document operation
- * @throws IOException
+ * @throws IOException ...
*/
public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException {
if (op == null) {
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
index 5faa6da94bd..e38e87dda57 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
@@ -23,16 +23,18 @@ public class VespaQuery extends EvalFunc<DataBag> {
private final String PROPERTY_QUERY_TEMPLATE = "query";
private final String PROPERTY_QUERY_SCHEMA = "schema";
+ private final String PROPERTY_ROOT_NODE = "rootnode";
private final VespaConfiguration configuration;
private final Properties properties;
private final String queryTemplate;
private final String querySchema;
+ private final String queryRootNode;
private VespaHttpClient httpClient;
public VespaQuery() {
- this(null);
+ this(new String[0]);
}
public VespaQuery(String... params) {
@@ -45,7 +47,7 @@ public class VespaQuery extends EvalFunc<DataBag> {
}
querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray");
-
+ queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children");
}
@Override
@@ -89,7 +91,7 @@ public class VespaQuery extends EvalFunc<DataBag> {
}
private JsonNode parseVespaResultJson(String result) throws IOException {
- return httpClient == null ? null : httpClient.parseResultJson(result);
+ return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode);
}
private DataBag createPigRepresentation(JsonNode hits) {
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
new file mode 100644
index 00000000000..66f04be657f
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
@@ -0,0 +1,62 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.VespaSimpleJsonInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import java.io.IOException;
+
+/**
+ * Simple JSON loader which loads either one JSON object per line or a
+ * multiline JSON consisting of objects in an array.
+ *
+ * Returns only the textual representation of the JSON object.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaSimpleJsonLoader extends LoadFunc {
+
+ private TupleFactory tupleFactory = TupleFactory.getInstance();
+ private VespaSimpleJsonInputFormat.VespaJsonRecordReader recordReader;
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return new VespaSimpleJsonInputFormat();
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+ recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ boolean done = recordReader.nextKeyValue();
+ if (done) {
+ return null;
+ }
+ Text json = recordReader.getCurrentKey();
+ if (json == null) {
+ return null;
+ }
+ return tupleFactory.newTuple(json.toString());
+
+ } catch (InterruptedException ex) {
+ return null;
+ }
+ }
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
index 4caa382223b..d5000b2b328 100644
--- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
@@ -23,6 +23,7 @@ import java.util.Properties;
*
* @author lesters
*/
+@SuppressWarnings("rawtypes")
public class VespaStorage extends StoreFunc {
private final boolean createDocOp;
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
index 7c44b54efcb..a79f63a77ce 100644
--- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
@@ -16,6 +16,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
index 5e2bf9b887e..0e1a8fba17c 100644
--- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -14,6 +14,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+@SuppressWarnings("serial")
public class VespaDocumentOperationTest {
@Test
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
index 9ba212293d0..0f123904cfb 100644
--- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
@@ -1,49 +1,39 @@
package com.yahoo.vespa.hadoop.pig;
import com.sun.net.httpserver.HttpServer;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient;
import com.yahoo.vespa.hadoop.util.MockQueryHandler;
-import junit.framework.Assert;
-import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.util.EntityUtils;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.data.Tuple;
import org.junit.Test;
-import java.io.IOException;
-import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.*;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class VespaQueryTest {
@Test
public void requireThatQueriesAreReturnedCorrectly() throws Exception {
- MockQueryHandler queryHandler = createQueryHandler();
+ runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901);
+ }
+
+ @Test
+ public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception {
+ runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902);
+ }
- final int port = 18901;
+ private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception {
final String endpoint = "http://localhost:" + port;
HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
server.createContext("/", queryHandler);
server.start();
- PigServer ps = setup("src/test/pig/query.pig", endpoint);
+ PigServer ps = setup(script, endpoint);
Iterator<Tuple> recommendations = ps.openIterator("recommendations");
while (recommendations.hasNext()) {
@@ -81,8 +71,8 @@ public class VespaQueryTest {
return ps;
}
- private MockQueryHandler createQueryHandler() {
- MockQueryHandler queryHandler = new MockQueryHandler();
+ private MockQueryHandler createQueryHandler(String childNode) {
+ MockQueryHandler queryHandler = new MockQueryHandler(childNode);
List<String> userIds = Arrays.asList("5", "104", "313");
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
index 33756b3ad31..322c729b8c5 100644
--- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -1,7 +1,13 @@
package com.yahoo.vespa.hadoop.pig;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.mapred.Counters;
@@ -11,26 +17,41 @@ import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
-
import org.junit.Test;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
public class VespaStorageTest {
@Test
+ public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(VespaConfiguration.DATA_FORMAT, "xml");
+ assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf);
+ }
+
+
+ @Test
public void requireThatPremadeOperationsFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_operations.pig");
}
@Test
+ public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
+ }
+
+
+ @Test
public void requireThatCreateOperationsFeedSucceeds() throws Exception {
assertAllDocumentsOk("src/test/pig/feed_create_operations.pig");
}
@@ -47,10 +68,13 @@ public class VespaStorageTest {
assertAllDocumentsOk("src/test/pig/feed_visit_data.pig");
}
- private PigServer setup(String script) throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.set(VespaConfiguration.DRYRUN, "true");
- conf.set(VespaConfiguration.ENDPOINT, "dummy-endpoint");
+
+ private PigServer setup(String script, Configuration conf) throws Exception {
+ if (conf == null) {
+ conf = new HdfsConfiguration();
+ }
+ conf.setIfUnset(VespaConfiguration.DRYRUN, "true");
+ conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint");
// Parameter substitutions - can also be set by configuration
Map<String, String> parameters = new HashMap<>();
@@ -63,8 +87,14 @@ public class VespaStorageTest {
return ps;
}
+
private void assertAllDocumentsOk(String script) throws Exception {
- PigServer ps = setup(script);
+ assertAllDocumentsOk(script, null);
+ }
+
+
+ private void assertAllDocumentsOk(String script, Configuration conf) throws Exception {
+ PigServer ps = setup(script, conf);
List<ExecJob> jobs = ps.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
for (JobStats js : stats.getJobGraph()) {
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
index f6977155b05..0bf9f6b447e 100644
--- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
@@ -19,9 +19,11 @@ import java.util.Map;
public class MockQueryHandler implements HttpHandler {
private final Map<String, List<MockQueryHit>> hitMap;
+ private final String childNode;
- public MockQueryHandler() {
+ public MockQueryHandler(String childNode) {
this.hitMap = new HashMap<>();
+ this.childNode = childNode;
}
public void handle(HttpExchange t) throws IOException {
@@ -145,9 +147,19 @@ public class MockQueryHandler implements HttpHandler {
g.writeFieldName("children");
g.writeStartArray();
+
+ if (!childNode.isEmpty()) {
+ g.writeStartObject();
+ g.writeFieldName(childNode);
+ g.writeStartArray();
+ }
}
private void writeResultsEnd(JsonGenerator g) throws IOException {
+ if (!childNode.isEmpty()) {
+ g.writeEndArray();
+ g.writeEndObject();
+ }
g.writeEndArray();
g.writeEndObject();
g.writeEndObject();
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
index 001879cbdf8..c98e7b1c02c 100644
--- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
@@ -33,4 +33,18 @@ public class TupleToolsTest {
tuple.append(value);
}
+ @Test
+ public void requireThatTupleToStringHandlesStringCharacters() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple);
+ addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
+
+ String template = "Id is <id> and rank is <rank>";
+ String result = TupleTools.toString(schema, tuple, template);
+
+ assertEquals("Id is _!@#$%^&*() and rank is 1", result);
+ }
+
}
diff --git a/hadoop/src/test/pig/feed_multiline_operations.pig b/hadoop/src/test/pig/feed_multiline_operations.pig
new file mode 100644
index 00000000000..e9efb36858b
--- /dev/null
+++ b/hadoop/src/test/pig/feed_multiline_operations.pig
@@ -0,0 +1,14 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaJsonLoader
+DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader();
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/operations_multiline_data.json' USING VespaJsonLoader() AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
diff --git a/hadoop/src/test/pig/feed_operations_with_json_loader.pig b/hadoop/src/test/pig/feed_operations_with_json_loader.pig
new file mode 100644
index 00000000000..6d31201e4eb
--- /dev/null
+++ b/hadoop/src/test/pig/feed_operations_with_json_loader.pig
@@ -0,0 +1,13 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaJsonLoader
+DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader();
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/operations_data.json' USING VespaJsonLoader() AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage(); \ No newline at end of file
diff --git a/hadoop/src/test/pig/feed_operations_xml.pig b/hadoop/src/test/pig/feed_operations_xml.pig
new file mode 100644
index 00000000000..d109d56ad1e
--- /dev/null
+++ b/hadoop/src/test/pig/feed_operations_xml.pig
@@ -0,0 +1,10 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for xml data
+data = LOAD 'src/test/resources/operations_data.xml' AS (data:chararray);
+
+-- Store into Vespa
+STORE data INTO '$ENDPOINT' USING VespaStorage(); \ No newline at end of file
diff --git a/hadoop/src/test/pig/query_alt_root.pig b/hadoop/src/test/pig/query_alt_root.pig
new file mode 100644
index 00000000000..8995990e398
--- /dev/null
+++ b/hadoop/src/test/pig/query_alt_root.pig
@@ -0,0 +1,19 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define Vespa query for retrieving blog posts
+DEFINE BlogPostRecommendations
+ com.yahoo.vespa.hadoop.pig.VespaQuery(
+ 'query=$ENDPOINT/search?query=<userid>&hits=100',
+ 'rootnode=root/children/children',
+ 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray'
+ );
+
+-- Load data from a local file
+users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray);
+users = FILTER users BY userid IS NOT null;
+
+-- Run a set of queries against Vespa
+recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*));
+
+-- Output recommendations
+DUMP recommendations;
diff --git a/hadoop/src/test/resources/operations_data.xml b/hadoop/src/test/resources/operations_data.xml
new file mode 100644
index 00000000000..cdf1ca78c1d
--- /dev/null
+++ b/hadoop/src/test/resources/operations_data.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="utf-8"?>
+<vespafeed>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/a-ha/Scoundrel+Days"> <url>http://music.yahoo.com/a-ha/Scoundrel+Days</url> <title><![CDATA[Scoundrel Days]]></title> <artist><![CDATA[a-ha]]></artist> <year>0</year> <popularity>290</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Restless+And+Wild"> <url>http://music.yahoo.com/Accept/Restless+And+Wild</url> <title><![CDATA[Restless And Wild]]></title> <artist><![CDATA[Accept]]></artist> <year>0</year> <popularity>75</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Staying+A+Life"> <url>http://music.yahoo.com/Accept/Staying+A+Life</url> <title><![CDATA[Staying A Life]]></title> <artist><![CDATA[Accept]]></artist> <year>1985</year> <popularity>77</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Dirt"> <url>http://music.yahoo.com/Alice+In+Chains/Dirt</url> <title><![CDATA[Dirt]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1992</year> <popularity>114</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Live"> <url>http://music.yahoo.com/Alice+In+Chains/Live</url> <title><![CDATA[Live]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1990</year> <popularity>363</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life"> <url>http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life</url> <title><![CDATA[This Is The Life]]></title> <artist><![CDATA[Amy MacDonald]]></artist> <year>2007</year> <popularity>355</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Ane+Brun/Duets"> <url>http://music.yahoo.com/Ane+Brun/Duets</url> <title><![CDATA[Duets]]></title> <artist><![CDATA[Ane Brun]]></artist> <year>0</year> <popularity>255</popularity> </document>
+ <update documenttype="music" documentid="id:music:music::http://music.yahoo.com/bobdylan/BestOf"><assign field="title">The Best of Bob Dylan</assign><add field="tracks"><item>Man Of Constant Sorrow</item></add></update>
+ <remove documentid="id:music:music::http://music.yahoo.com/Aqpop/Beautifully+Smart" />
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Annuals/Be+He+Me"> <url>http://music.yahoo.com/Annuals/Be+He+Me</url> <title><![CDATA[Be He Me]]></title> <artist><![CDATA[Annuals]]></artist> <year>0</year> <popularity>207</popularity> </document>
+</vespafeed>
diff --git a/hadoop/src/test/resources/operations_multiline_data.json b/hadoop/src/test/resources/operations_multiline_data.json
new file mode 100644
index 00000000000..2b51698d9b7
--- /dev/null
+++ b/hadoop/src/test/resources/operations_multiline_data.json
@@ -0,0 +1,93 @@
+[
+ {
+ "put": "id:testapp:metric::clicks-2015110414",
+ "fields": {
+ "date": "2015110414",
+ "name": "clicks",
+ "value": 1,
+ "application": "testapp"
+ }
+ },
+ {
+ "fields": {
+ "date": "2015110416",
+ "name": "clicks",
+ "value": 5,
+ "application": "testapp"
+ },
+ "put": "id:testapp:metric::clicks-2015110416"
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110415",
+ "fields": {
+ "date": "2015110415",
+ "name": "clicks",
+ "value": 2,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110417",
+ "fields": {
+ "date": "2015110417",
+ "name": "clicks",
+ "value": 3,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110418",
+ "fields": {
+ "date": "2015110418",
+ "name": "clicks",
+ "value": 6,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110419",
+ "fields": {
+ "date": "2015110419",
+ "name": "clicks",
+ "value": 3,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110420",
+ "fields": {
+ "date": "2015110420",
+ "name": "clicks",
+ "value": 4,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110421",
+ "fields": {
+ "date": "2015110421",
+ "name": "clicks",
+ "value": 2,
+ "application": "testapp"
+ }
+ },
+ {
+ "fields": {
+ "date": "2015110422",
+ "name": "clicks",
+ "value": 5,
+ "application": "testapp"
+ },
+ "condition": "metrics==0",
+ "put": "id:testapp:metric::clicks-2015110422"
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110423",
+ "fields": {
+ "date": "2015110423",
+ "name": "clicks",
+ "value": 1,
+ "application": "testapp"
+ }
+ }
+]