aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-hadoop
diff options
context:
space:
mode:
authorLester Solbakken <lesters@yahoo-inc.com>2017-03-17 12:29:41 +0100
committerLester Solbakken <lesters@yahoo-inc.com>2017-03-17 12:29:41 +0100
commitef775d57f273a69f0de2cd52518cbd9260e55eac (patch)
tree5b89780e05f24c70b0158968fda5225822df9933 /vespa-hadoop
parentb09d7deb64e5a723c5a052dd2b1db225f632405f (diff)
Renamce hadoop -> vespa-hadoop
Diffstat (limited to 'vespa-hadoop')
-rw-r--r--vespa-hadoop/OWNERS1
-rw-r--r--vespa-hadoop/README4
-rw-r--r--vespa-hadoop/pom.xml151
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java36
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java51
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java222
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java98
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java69
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java187
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java104
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java101
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java113
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java363
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java113
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java62
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java187
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java197
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java274
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java96
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java110
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java218
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java50
-rw-r--r--vespa-hadoop/src/test/pig/feed_create_operations.pig23
-rw-r--r--vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig18
-rw-r--r--vespa-hadoop/src/test/pig/feed_multiline_operations.pig14
-rw-r--r--vespa-hadoop/src/test/pig/feed_operations.pig10
-rw-r--r--vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig13
-rw-r--r--vespa-hadoop/src/test/pig/feed_operations_xml.pig10
-rw-r--r--vespa-hadoop/src/test/pig/feed_visit_data.pig11
-rw-r--r--vespa-hadoop/src/test/pig/query.pig18
-rw-r--r--vespa-hadoop/src/test/pig/query_alt_root.pig19
-rw-r--r--vespa-hadoop/src/test/resources/operations_data.json10
-rw-r--r--vespa-hadoop/src/test/resources/operations_data.xml13
-rw-r--r--vespa-hadoop/src/test/resources/operations_multiline_data.json93
-rw-r--r--vespa-hadoop/src/test/resources/tabular_data.csv11
-rw-r--r--vespa-hadoop/src/test/resources/user_ids.csv4
-rw-r--r--vespa-hadoop/src/test/resources/visit_data.json12
37 files changed, 3086 insertions, 0 deletions
diff --git a/vespa-hadoop/OWNERS b/vespa-hadoop/OWNERS
new file mode 100644
index 00000000000..6b09ce48bd4
--- /dev/null
+++ b/vespa-hadoop/OWNERS
@@ -0,0 +1 @@
+lesters
diff --git a/vespa-hadoop/README b/vespa-hadoop/README
new file mode 100644
index 00000000000..1b567b88c1d
--- /dev/null
+++ b/vespa-hadoop/README
@@ -0,0 +1,4 @@
+The Vespa Hadoop client.
+
+Contains APIs for feeding and querying Vespa from the grid.
+
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
new file mode 100644
index 00000000000..f23b73abc54
--- /dev/null
+++ b/vespa-hadoop/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>6-SNAPSHOT</version>
+ </parent>
+ <artifactId>vespa-hadoop</artifactId>
+ <version>6-SNAPSHOT</version>
+ <name>${project.artifactId}</name>
+ <description>Integration tools between Vespa and Hadoop</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <hadoop.version>2.7.3</hadoop.version>
+ <pig.version>0.14.0</pig.version>
+ </properties>
+
+ <dependencies>
+ <!-- Hadoop dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pig</artifactId>
+ <version>${pig.version}</version>
+ <classifier>h2</classifier>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- These are inherited from parent. Needed for correct versions on Hadoop. -->
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.5.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.4.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.1</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Vespa feeding dependencies -->
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>vespa-http-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <showDeprecation>true</showDeprecation>
+ <compilerArgs>
+ <arg>-Xlint:all</arg>
+ <arg>-Werror</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <minimizeJar>false</minimizeJar>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>com.yahoo.vespa.feeder.shaded.internal.apache.http</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>com.yahoo.vespa.feeder.shaded.internal.apache.commons</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ </build>
+</project>
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
new file mode 100644
index 00000000000..cd0f65496d0
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
@@ -0,0 +1,36 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * The output committer describes the commit task output for a Map-Reduce
+ * job. Not currently used, but is part of the Hadoop protocol since 2.7.
+ *
+ * @author lesters
+ */
+public class VespaOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
new file mode 100644
index 00000000000..720a6adf477
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -0,0 +1,51 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * An output specification for writing to Vespa instances in a Map-Reduce job.
+ * Mainly returns an instance of a {@link VespaRecordWriter} that does the
+ * actual feeding to Vespa.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaOutputFormat extends OutputFormat {
+
+ private final Properties configOverride;
+
+ public VespaOutputFormat() {
+ super();
+ this.configOverride = null;
+ }
+
+ public VespaOutputFormat(Properties configOverride) {
+ super();
+ this.configOverride = configOverride;
+ }
+
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ VespaCounters counters = VespaCounters.get(context);
+ VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
+ return new VespaRecordWriter(configuration, counters);
+ }
+
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ return new VespaOutputCommitter();
+ }
+
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
new file mode 100644
index 00000000000..8072e99e2ab
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -0,0 +1,222 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedClientFactory;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+
+/**
+ * VespaRecordWriter sends the output &lt;key, value&gt; to one or more Vespa
+ * endpoints.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaRecordWriter extends RecordWriter {
+
+ private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
+
+ private boolean initialized = false;
+ private FeedClient feedClient;
+
+ private final VespaCounters counters;
+ private final VespaConfiguration configuration;
+ private final int progressInterval;
+
+ VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ this.counters = counters;
+ this.configuration = configuration;
+ this.progressInterval = configuration.progressInterval();
+ }
+
+
+ @Override
+ public void write(Object key, Object data) throws IOException, InterruptedException {
+ if (!initialized) {
+ initialize();
+ }
+
+ String doc = data.toString().trim();
+
+ // Parse data to find document id - if none found, skip this write
+ String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
+ : findDocIdFromXml(doc);
+ if (docId != null && docId.length() >= 0) {
+ feedClient.stream(docId, doc);
+ counters.incrementDocumentsSent(1);
+ } else {
+ counters.incrementDocumentsSkipped(1);
+ }
+
+ if (counters.getDocumentsSent() % progressInterval == 0) {
+ String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
+ counters.getDocumentsSent(),
+ counters.getDocumentsOk(),
+ counters.getDocumentsFailed(),
+ counters.getDocumentsSkipped());
+ log.info(progress);
+ }
+
+ }
+
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (feedClient != null) {
+ feedClient.close();
+ }
+ }
+
+
+ private void initialize() {
+ if (!configuration.dryrun() && configuration.randomSartupSleepMs() > 0) {
+ int delay = new Random().nextInt(configuration.randomSartupSleepMs());
+ log.info("VespaStorage: Delaying startup by " + delay + " ms");
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e) {}
+ }
+
+ ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
+ connParamsBuilder.setDryRun(configuration.dryrun());
+ connParamsBuilder.setUseCompression(configuration.useCompression());
+ connParamsBuilder.setEnableV3Protocol(configuration.useV3Protocol());
+ connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
+ connParamsBuilder.setMaxRetries(configuration.numRetries());
+ if (configuration.proxyHost() != null) {
+ connParamsBuilder.setProxyHost(configuration.proxyHost());
+ }
+ if (configuration.proxyPort() >= 0) {
+ connParamsBuilder.setProxyPort(configuration.proxyPort());
+ }
+
+ FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
+ feedParamsBuilder.setDataFormat(configuration.dataFormat());
+ feedParamsBuilder.setRoute(configuration.route());
+ feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
+ feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
+
+ SessionParams.Builder sessionParams = new SessionParams.Builder();
+ sessionParams.setThrottlerMinSize(configuration.throttlerMinSize());
+ sessionParams.setConnectionParams(connParamsBuilder.build());
+ sessionParams.setFeedParams(feedParamsBuilder.build());
+
+ String endpoints = configuration.endpoint();
+ StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String endpoint = tokenizer.nextToken().trim();
+ sessionParams.addCluster(new Cluster.Builder().addEndpoint(
+ Endpoint.create(endpoint, 4080, false)
+ ).build());
+ }
+
+ ResultCallback resultCallback = new ResultCallback(counters);
+ feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
+
+ initialized = true;
+ log.info("VespaStorage configuration:\n" + configuration.toString());
+ }
+
+ private String findDocIdFromXml(String xml) {
+ try {
+ XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
+ while (eventReader.hasNext()) {
+ XMLEvent event = eventReader.nextEvent();
+ if (event.getEventType() == XMLEvent.START_ELEMENT) {
+ StartElement element = event.asStartElement();
+ String elementName = element.getName().getLocalPart();
+ if (VespaDocumentOperation.Operation.valid(elementName)) {
+ return element.getAttributeByName(QName.valueOf("documentid")).getValue();
+ }
+ }
+ }
+ } catch (XMLStreamException | FactoryConfigurationError e) {
+ // as json dude does
+ return null;
+ }
+ return null;
+ }
+
+ private String findDocId(String json) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try(JsonParser parser = factory.createParser(json)) {
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ return null;
+ }
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ String fieldName = parser.getCurrentName();
+ parser.nextToken();
+ if (VespaDocumentOperation.Operation.valid(fieldName)) {
+ String docId = parser.getText();
+ return docId;
+ } else {
+ parser.skipChildren();
+ }
+ }
+ } catch (JsonParseException ex) {
+ return null;
+ }
+ return null;
+ }
+
+
+ static class ResultCallback implements FeedClient.ResultCallback {
+ final VespaCounters counters;
+
+ public ResultCallback(VespaCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ if (!documentResult.isSuccess()) {
+ counters.incrementDocumentsFailed(1);
+ StringBuilder sb = new StringBuilder();
+ sb.append("Problems with docid ");
+ sb.append(docId);
+ sb.append(": ");
+ List<Result.Detail> details = documentResult.getDetails();
+ for (Result.Detail detail : details) {
+ sb.append(detail.toString());
+ sb.append(" ");
+ }
+ log.warning(sb.toString());
+ return;
+ }
+ counters.incrementDocumentsOk(1);
+ }
+
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
new file mode 100644
index 00000000000..d7bdc592fd5
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
@@ -0,0 +1,98 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+
+/**
+ * Simple JSON reader which splits the input file along JSON object boundaries.
+ *
+ * There are two cases handled here:
+ * 1. Each line contains a JSON object, i.e. { ... }
+ * 2. The file contains an array of objects with arbitrary line breaks, i.e. [ {...}, {...} ]
+ *
+ * Not suitable for cases where you want to extract objects from some other arbitrary structure.
+ *
+ * TODO: Support config which points to a array in the JSON as start point for object extraction,
+ * ala how it is done in VespaHttpClient.parseResultJson, i.e. support rootNode config.
+ *
+ * @author lesters
+ */
+public class VespaSimpleJsonInputFormat extends FileInputFormat<Text, NullWritable> {
+
+ @Override
+ public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ return new VespaJsonRecordReader();
+ }
+
+ public static class VespaJsonRecordReader extends RecordReader<Text, NullWritable> {
+ private long remaining;
+ private JsonParser parser;
+ private Text currentKey;
+ private NullWritable currentValue = NullWritable.get();
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit) split;
+ FSDataInputStream stream = FileSystem.get(context.getConfiguration()).open(fileSplit.getPath());
+ if (fileSplit.getStart() != 0) {
+ stream.seek(fileSplit.getStart());
+ }
+
+ remaining = fileSplit.getLength();
+
+ JsonFactory factory = new JsonFactory();
+ parser = factory.createParser(new BufferedInputStream(stream));
+ parser.setCodec(new ObjectMapper());
+ parser.nextToken();
+ if (parser.currentToken() == JsonToken.START_ARRAY) {
+ parser.nextToken();
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (parser.currentToken() != JsonToken.START_OBJECT) {
+ return true;
+ }
+ currentKey = new Text(parser.readValueAsTree().toString());
+ parser.nextToken();
+ return false;
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+ return currentKey;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return parser.getCurrentLocation().getByteOffset() / remaining;
+ }
+
+ @Override
+ public void close() throws IOException {
+ parser.close();
+ }
+ }
+
+}
+
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
new file mode 100644
index 00000000000..1d11a3f9fdd
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
@@ -0,0 +1,69 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TupleTools {
+
+ private static final Pattern pattern = Pattern.compile("<([\\w]+)>");
+
+ public static Map<String, Object> tupleMap(Schema schema, Tuple tuple) throws IOException {
+ Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
+ List<Schema.FieldSchema> schemas = schema.getFields();
+ for (int i = 0; i < schemas.size(); i++) {
+ Schema.FieldSchema field = schemas.get(i);
+ String alias = field.alias;
+ Object value = tuple.get(i);
+ if (value != null) {
+ tupleMap.put(alias, value);
+ }
+ }
+ return tupleMap;
+ }
+
+ public static Map<String, Object> tupleMap(ResourceSchema schema, Tuple tuple) throws IOException {
+ Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
+ ResourceSchema.ResourceFieldSchema[] schemas = schema.getFields();
+ for (int i = 0; i < schemas.length; i++) {
+ ResourceSchema.ResourceFieldSchema field = schemas[i];
+ String alias = field.getName();
+ Object value = tuple.get(i);
+ if (value != null) {
+ tupleMap.put(alias, value);
+ }
+ }
+ return tupleMap;
+ }
+
+ public static String toString(Schema schema, Tuple tuple, String template) throws IOException {
+ return toString(tupleMap(schema, tuple), template);
+ }
+
+ public static String toString(Map<String,Object> fields, String template) {
+ if (template == null || template.length() == 0) {
+ return template;
+ }
+ if (fields == null || fields.size() == 0) {
+ return template;
+ }
+
+ Matcher m = pattern.matcher(template);
+ StringBuffer sb = new StringBuffer();
+ while (m.find()) {
+ Object value = fields.get(m.group(1));
+ String replacement = value != null ? value.toString() : m.group(0);
+ m.appendReplacement(sb, Matcher.quoteReplacement(replacement));
+ }
+ m.appendTail(sb);
+ return sb.toString();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
new file mode 100644
index 00000000000..6e35e5e465a
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -0,0 +1,187 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.yahoo.vespa.http.client.config.FeedParams;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+
+public class VespaConfiguration {
+
+ public static final String ENDPOINT = "vespa.feed.endpoint";
+ public static final String PROXY_HOST = "vespa.feed.proxy.host";
+ public static final String PROXY_PORT = "vespa.feed.proxy.port";
+ public static final String DRYRUN = "vespa.feed.dryrun";
+ public static final String USE_COMPRESSION = "vespa.feed.usecompression";
+ public static final String DATA_FORMAT = "vespa.feed.data.format";
+ public static final String PROGRESS_REPORT = "vespa.feed.progress.interval";
+ public static final String V3_PROTOCOL = "vespa.feed.v3.protocol";
+ public static final String CONNECTIONS = "vespa.feed.connections";
+ public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size";
+ public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout";
+ public static final String ROUTE = "vespa.feed.route";
+ public static final String MAX_SLEEP_TIME_MS = "vespa.feed.max.sleep.time.ms";
+ public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
+ public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
+ public static final String NUM_RETRIES = "vespa.feed.num.retries";
+
+ private final Configuration conf;
+ private final Properties override;
+
+ private VespaConfiguration(Configuration conf, Properties override) {
+ this.conf = conf;
+ this.override = override;
+ }
+
+
+ public static VespaConfiguration get(Configuration conf, Properties override) {
+ return new VespaConfiguration(conf, override);
+ }
+
+
+ public String endpoint() {
+ return getString(ENDPOINT);
+ }
+
+
+ public String proxyHost() {
+ return getString(PROXY_HOST);
+ }
+
+
+ public int proxyPort() {
+ return getInt(PROXY_PORT, 4080);
+ }
+
+
+ public boolean dryrun() {
+ return getBoolean(DRYRUN, false);
+ }
+
+
+ public boolean useCompression() {
+ return getBoolean(USE_COMPRESSION, true);
+ }
+
+
+ public boolean useV3Protocol() {
+ return getBoolean(V3_PROTOCOL, true);
+ }
+
+
+ public int numConnections() {
+ return getInt(CONNECTIONS, 2);
+ }
+
+
+ public int throttlerMinSize() {
+ return getInt(THROTTLER_MIN_SIZE, 0);
+ }
+
+
+ public int queryConnectionTimeout() {
+ return getInt(QUERY_CONNECTION_TIMEOUT, 10000);
+ }
+
+
+ public String route() {
+ return getString(ROUTE);
+ }
+
+
+ public int maxSleepTimeMs() {
+ return getInt(MAX_SLEEP_TIME_MS, 10000);
+ }
+
+
+ public int maxInFlightRequests() {
+ return getInt(MAX_IN_FLIGHT_REQUESTS, 1000);
+ }
+
+
+ public int randomSartupSleepMs() {
+ return getInt(RANDOM_STARTUP_SLEEP, 30000);
+ }
+
+
+ public int numRetries() {
+ return getInt(NUM_RETRIES, 100);
+ }
+
+
+ public FeedParams.DataFormat dataFormat() {
+ String format = getString(DATA_FORMAT);
+ if ("xml".equalsIgnoreCase(format)) {
+ return FeedParams.DataFormat.XML_UTF8;
+ }
+ return FeedParams.DataFormat.JSON_UTF8;
+ }
+
+
+ public int progressInterval() {
+ return getInt(PROGRESS_REPORT, 1000);
+ }
+
+
+ private String getString(String name) {
+ if (override != null && override.containsKey(name)) {
+ return override.getProperty(name);
+ }
+ return conf != null ? conf.get(name) : null;
+ }
+
+
+ private int getInt(String name, int defaultValue) {
+ if (override != null && override.containsKey(name)) {
+ return Integer.parseInt(override.getProperty(name));
+ }
+ return conf != null ? conf.getInt(name, defaultValue) : defaultValue;
+ }
+
+
+ private boolean getBoolean(String name, boolean defaultValue) {
+ if (override != null && override.containsKey(name)) {
+ return Boolean.parseBoolean(override.getProperty(name));
+ }
+ return conf != null ? conf.getBoolean(name, defaultValue) : defaultValue;
+
+ }
+
+ public static Properties loadProperties(String... params) {
+ Properties properties = new Properties();
+ if (params != null) {
+ for (String s : params) {
+ try {
+ properties.load(new StringReader(s));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+ return properties;
+ }
+
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ENDPOINT + ": " + endpoint() + "\n");
+ sb.append(PROXY_HOST + ": " + proxyHost() + "\n");
+ sb.append(PROXY_PORT + ": " + proxyPort() + "\n");
+ sb.append(DRYRUN + ": " + dryrun() +"\n");
+ sb.append(USE_COMPRESSION + ": " + useCompression() +"\n");
+ sb.append(DATA_FORMAT + ": " + dataFormat() +"\n");
+ sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n");
+ sb.append(V3_PROTOCOL + ": " + useV3Protocol() +"\n");
+ sb.append(CONNECTIONS + ": " + numConnections() +"\n");
+ sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n");
+ sb.append(QUERY_CONNECTION_TIMEOUT + ": " + queryConnectionTimeout() +"\n");
+ sb.append(ROUTE + ": " + route() +"\n");
+ sb.append(MAX_SLEEP_TIME_MS + ": " + maxSleepTimeMs() +"\n");
+ sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n");
+ sb.append(RANDOM_STARTUP_SLEEP + ": " + randomSartupSleepMs() +"\n");
+ sb.append(NUM_RETRIES + ": " + numRetries() +"\n");
+ return sb.toString();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
new file mode 100644
index 00000000000..dbe47c23814
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
@@ -0,0 +1,104 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class VespaCounters {
+
+ public static final String GROUP = "Vespa Feed Counters";
+ public static final String DOCS_OK = "Documents ok";
+ public static final String DOCS_SENT = "Documents sent";
+ public static final String DOCS_FAILED = "Documents failed";
+ public static final String DOCS_SKIPPED = "Documents skipped";
+
+ private final Counter documentsSent;
+ private final Counter documentsOk;
+ private final Counter documentsFailed;
+ private final Counter documentsSkipped;
+
+
+ private VespaCounters(Job job) throws IOException {
+ Counters counters = job.getCounters();
+ documentsSent = counters.findCounter(GROUP, DOCS_SENT);
+ documentsOk = counters.findCounter(GROUP, DOCS_OK);
+ documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ private VespaCounters(TaskAttemptContext context) {
+ documentsSent = context.getCounter(GROUP, DOCS_SENT);
+ documentsOk = context.getCounter(GROUP, DOCS_OK);
+ documentsFailed = context.getCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = context.getCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ private VespaCounters(org.apache.hadoop.mapred.Counters counters) {
+ documentsSent = counters.findCounter(GROUP, DOCS_SENT);
+ documentsOk = counters.findCounter(GROUP, DOCS_OK);
+ documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ public static VespaCounters get(Job job) throws IOException {
+ return new VespaCounters(job);
+ }
+
+
+ public static VespaCounters get(TaskAttemptContext context) {
+ return new VespaCounters(context);
+ }
+
+
+ public static VespaCounters get(org.apache.hadoop.mapred.Counters counters) {
+ return new VespaCounters(counters);
+
+ }
+
+
+ public long getDocumentsSent() {
+ return documentsSent.getValue();
+ }
+
+
+ public void incrementDocumentsSent(long incr) {
+ documentsSent.increment(incr);
+ }
+
+
+ public long getDocumentsOk() {
+ return documentsOk.getValue();
+ }
+
+
+ public void incrementDocumentsOk(long incr) {
+ documentsOk.increment(incr);
+ }
+
+
+ public long getDocumentsFailed() {
+ return documentsFailed.getValue();
+ }
+
+
+ public void incrementDocumentsFailed(long incr) {
+ documentsFailed.increment(incr);
+ }
+
+
+ public long getDocumentsSkipped() {
+ return documentsSkipped.getValue();
+ }
+
+
+ public void incrementDocumentsSkipped(long incr) {
+ documentsSkipped.increment(incr);
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
new file mode 100644
index 00000000000..50a089c8b45
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
@@ -0,0 +1,101 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Scanner;
+
+public class VespaHttpClient {
+
+ private final HttpClient httpClient;
+
+ public VespaHttpClient() {
+ this(null);
+ }
+
+ public VespaHttpClient(VespaConfiguration configuration) {
+ httpClient = createClient(configuration);
+ }
+
+ public String get(String url) throws IOException {
+ HttpGet httpGet = new HttpGet(url);
+ HttpResponse httpResponse = httpClient.execute(httpGet);
+
+ HttpEntity entity = httpResponse.getEntity();
+ InputStream is = entity.getContent();
+
+ String result = "";
+ Scanner scanner = new Scanner(is, "UTF-8").useDelimiter("\\A");
+ if (scanner.hasNext()) {
+ result = scanner.next();
+ }
+ EntityUtils.consume(entity);
+
+ if (httpResponse.getStatusLine().getStatusCode() != 200) {
+ return null;
+ }
+
+ return result;
+ }
+
+ public JsonNode parseResultJson(String json, String rootNode) throws IOException {
+ if (json == null || json.isEmpty()) {
+ return null;
+ }
+ if (rootNode == null || rootNode.isEmpty()) {
+ return null;
+ }
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode node = m.readTree(json);
+ if (node != null) {
+ String[] path = rootNode.split("/");
+ for (String p : path) {
+ node = node.get(p);
+
+ if (node == null) {
+ return null;
+ }
+
+ // if node is an array, return the first node that has the correct path
+ if (node.isArray()) {
+ for (int i = 0; i < node.size(); ++i) {
+ JsonNode n = node.get(i);
+ if (n.has(p)) {
+ node = n;
+ break;
+ }
+ }
+ }
+
+ }
+ }
+ return node;
+ }
+
+ private HttpClient createClient(VespaConfiguration configuration) {
+ HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+
+ RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ if (configuration != null) {
+ requestConfigBuilder.setSocketTimeout(configuration.queryConnectionTimeout());
+ requestConfigBuilder.setConnectTimeout(configuration.queryConnectionTimeout());
+ if (configuration.proxyHost() != null) {
+ requestConfigBuilder.setProxy(new HttpHost(configuration.proxyHost(), configuration.proxyPort()));
+ }
+ }
+ clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
+ return clientBuilder.build();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
new file mode 100644
index 00000000000..0208b4165d3
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
@@ -0,0 +1,113 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class VespaQuerySchema implements Iterable<VespaQuerySchema.AliasTypePair> {
+
+ private final List<AliasTypePair> tupleSchema = new ArrayList<>();
+
+ public VespaQuerySchema(String schema) {
+ for (String e : schema.split(",")) {
+ String[] pair = e.split(":");
+ String alias = pair[0].trim();
+ String type = pair[1].trim();
+ tupleSchema.add(new AliasTypePair(alias, type));
+ }
+ }
+
+ public Tuple buildTuple(int rank, JsonNode hit) {
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ for (VespaQuerySchema.AliasTypePair tupleElement : tupleSchema) {
+ String alias = tupleElement.getAlias();
+ Byte type = DataType.findTypeByName(tupleElement.getType());
+
+ // reserved word
+ if ("rank".equals(alias)) {
+ tuple.append(rank);
+ } else {
+ JsonNode field = hit;
+ String[] path = alias.split("/"); // move outside
+ for (String p : path) {
+ field = field.get(p);
+ if (field == null) {
+ type = DataType.NULL; // effectively skip field as it is not found
+ break;
+ }
+ }
+ switch (type) {
+ case DataType.BOOLEAN:
+ tuple.append(field.asBoolean());
+ break;
+ case DataType.INTEGER:
+ tuple.append(field.asInt());
+ break;
+ case DataType.LONG:
+ tuple.append(field.asLong());
+ break;
+ case DataType.FLOAT:
+ case DataType.DOUBLE:
+ tuple.append(field.asDouble());
+ break;
+ case DataType.DATETIME:
+ tuple.append(field.asText());
+ break;
+ case DataType.CHARARRAY:
+ tuple.append(field.asText());
+ break;
+ default:
+ // the rest of the data types are currently not supported
+ }
+ }
+ }
+ return tuple;
+ }
+
+ public static Schema getPigSchema(String schemaString) {
+ Schema schema = null;
+ schemaString = schemaString.replace("/", "_");
+ schemaString = "{(" + schemaString + ")}";
+ try {
+ schema = Utils.getSchemaFromString(schemaString);
+ } catch (ParserException e) {
+ e.printStackTrace();
+ }
+ return schema;
+ }
+
+ @Override
+ public Iterator<AliasTypePair> iterator() {
+ return tupleSchema.iterator();
+ }
+
+
+ public static class AliasTypePair {
+ private final String alias;
+ private final String type;
+
+ AliasTypePair(String alias, String type) {
+ this.alias = alias;
+ this.type = type;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
new file mode 100644
index 00000000000..017ffcdd215
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -0,0 +1,363 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigWarning;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * A Pig UDF to convert simple Pig types into a valid Vespa JSON document format.
+ *
+ * @author lesters
+ */
+public class VespaDocumentOperation extends EvalFunc<String> {
+
+ public enum Operation {
+ DOCUMENT,
+ PUT,
+ ID,
+ REMOVE,
+ UPDATE;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase();
+ }
+
+ public static Operation fromString(String text) {
+ for (Operation op : Operation.values()) {
+ if (op.toString().equalsIgnoreCase(text)) {
+ return op;
+ }
+ }
+ throw new IllegalArgumentException("Unknown operation: " + text);
+ }
+
+ public static boolean valid(String text) {
+ for (Operation op : Operation.values()) {
+ if (op.toString().equalsIgnoreCase(text)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+ private static final String PROPERTY_ID_TEMPLATE = "docid";
+ private static final String PROPERTY_OPERATION = "operation";
+ private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields";
+ private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
+ private static final String EXCLUDE_FIELDS = "exclude-fields";
+
+ private static final String PARTIAL_UPDATE_ASSIGN = "assign";
+
+ private final String template;
+ private final Operation operation;
+ private final Properties properties;
+
+ public VespaDocumentOperation(String... params) {
+ properties = VespaConfiguration.loadProperties(params);
+ template = properties.getProperty(PROPERTY_ID_TEMPLATE);
+ operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
+ }
+
+ @Override
+ public String exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return null;
+ }
+ if (template == null || template.length() == 0) {
+ warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+ if (operation == null) {
+ warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ String json = null;
+
+ try {
+ if (reporter != null) {
+ reporter.progress();
+ }
+
+ Map<String, Object> fields = TupleTools.tupleMap(getInputSchema(), tuple);
+ String docId = TupleTools.toString(fields, template);
+
+ // create json
+ json = create(operation, docId, fields, properties);
+ if (json == null || json.length() == 0) {
+ warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+
+ } catch (Exception e) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Caught exception processing input row: \n");
+ sb.append(tuple.toString());
+ sb.append("\nException: ");
+ sb.append(ExceptionUtils.getStackTrace(e));
+ warn(sb.toString(), PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ return json;
+ }
+
+
+ /**
+ * Create a JSON Vespa document operation given the supplied fields,
+ * operation and document id template.
+ *
+ * @param op Operation (put, remove, update)
+ * @param docId Document id
+ * @param fields Fields to put in document operation
+ * @return A valid JSON Vespa document operation
+ * @throws IOException ...
+ */
+ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException {
+ if (op == null) {
+ return null;
+ }
+ if (docId == null || docId.length() == 0) {
+ return null;
+ }
+ if (fields.isEmpty()) {
+ return null;
+ }
+
+ // create json format
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+ g.writeStartObject();
+
+ g.writeStringField(op.toString(), docId);
+
+ if (op != Operation.REMOVE) {
+ writeField("fields", fields, DataType.MAP, g, properties, op, 0);
+ }
+
+ g.writeEndObject();
+ g.close();
+
+ return out.toString();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Operation op, int depth) throws IOException {
+ if (shouldWriteField(name, properties, depth)) {
+ g.writeFieldName(name);
+ if (shouldWritePartialUpdate(op, depth)) {
+ writePartialUpdate(value, type, g, name, properties, op, depth);
+ } else {
+ writeValue(value, type, g, name, properties, op, depth);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ switch (type) {
+ case DataType.UNKNOWN:
+ break;
+ case DataType.NULL:
+ g.writeNull();
+ break;
+ case DataType.BOOLEAN:
+ g.writeBoolean((boolean) value);
+ break;
+ case DataType.INTEGER:
+ g.writeNumber((int) value);
+ break;
+ case DataType.LONG:
+ g.writeNumber((long) value);
+ break;
+ case DataType.FLOAT:
+ g.writeNumber((float) value);
+ break;
+ case DataType.DOUBLE:
+ g.writeNumber((double) value);
+ break;
+ case DataType.DATETIME:
+ g.writeNumber(((DateTime) value).getMillis());
+ break;
+ case DataType.BYTEARRAY:
+ DataByteArray bytes = (DataByteArray) value;
+ String raw = Base64.getEncoder().encodeToString(bytes.get());
+ g.writeString(raw);
+ break;
+ case DataType.CHARARRAY:
+ g.writeString((String) value);
+ break;
+ case DataType.BIGINTEGER:
+ g.writeNumber((BigInteger) value);
+ break;
+ case DataType.BIGDECIMAL:
+ g.writeNumber((BigDecimal) value);
+ break;
+ case DataType.MAP:
+ g.writeStartObject();
+ Map<Object, Object> map = (Map<Object, Object>) value;
+ if (shouldCreateTensor(map, name, properties)) {
+ writeTensor(map, g);
+ } else {
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ Object v = entry.getValue();
+ Byte t = DataType.findType(v);
+ writeField(k, v, t, g, properties, op, depth+1);
+ }
+ }
+ g.writeEndObject();
+ break;
+ case DataType.TUPLE:
+ Tuple tuple = (Tuple) value;
+ boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
+ if (writeStartArray) {
+ g.writeStartArray();
+ }
+ for (Object v : tuple) {
+ writeValue(v, DataType.findType(v), g, name, properties, op, depth);
+ }
+ if (writeStartArray) {
+ g.writeEndArray();
+ }
+ break;
+ case DataType.BAG:
+ DataBag bag = (DataBag) value;
+ g.writeStartArray();
+ for (Tuple t : bag) {
+ writeValue(t, DataType.TUPLE, g, name, properties, op, depth);
+ }
+ g.writeEndArray();
+ break;
+ }
+
+ }
+
+ private static boolean shouldWritePartialUpdate(Operation op, int depth) {
+ return op == Operation.UPDATE && depth == 1;
+ }
+
+ private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ g.writeStartObject();
+ g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation
+ writeValue(value, type, g, name, properties, op, depth);
+ g.writeEndObject();
+ }
+
+ private static boolean shouldWriteTupleStart(Tuple tuple, String name, Properties properties) {
+ if (tuple.size() > 1 || properties == null) {
+ return true;
+ }
+ String simpleArrayFields = properties.getProperty(SIMPLE_ARRAY_FIELDS);
+ if (simpleArrayFields == null) {
+ return true;
+ }
+ if (simpleArrayFields.equals("*")) {
+ return false;
+ }
+ String[] fields = simpleArrayFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS);
+ if (tensorFields == null) {
+ return false;
+ }
+ String[] fields = tensorFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean shouldWriteField(String name, Properties properties, int depth) {
+ if (properties == null || depth != 1) {
+ return true;
+ }
+ String excludeFields = properties.getProperty(EXCLUDE_FIELDS);
+ if (excludeFields == null) {
+ return true;
+ }
+ String[] fields = excludeFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
+ g.writeFieldName("cells");
+ g.writeStartArray();
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ Double v = Double.parseDouble(entry.getValue().toString());
+
+ g.writeStartObject();
+
+ // Write address
+ g.writeFieldName("address");
+ g.writeStartObject();
+
+ String[] dimensions = k.split(",");
+ for (String dimension : dimensions) {
+ if (dimension == null || dimension.isEmpty()) {
+ continue;
+ }
+ String[] address = dimension.split(":");
+ if (address.length != 2) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ String dim = address[0];
+ String label = address[1];
+ if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ g.writeFieldName(dim.trim());
+ g.writeString(label.trim());
+ }
+ g.writeEndObject();
+
+ // Write value
+ g.writeFieldName("value");
+ g.writeNumber(v);
+
+ g.writeEndObject();
+ }
+ g.writeEndArray();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
new file mode 100644
index 00000000000..e38e87dda57
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
@@ -0,0 +1,113 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaQuerySchema;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A Pig UDF to run a query against a Vespa cluster and return the
+ * results.
+ *
+ * @author lesters
+ */
+public class VespaQuery extends EvalFunc<DataBag> {
+
+ private final String PROPERTY_QUERY_TEMPLATE = "query";
+ private final String PROPERTY_QUERY_SCHEMA = "schema";
+ private final String PROPERTY_ROOT_NODE = "rootnode";
+
+ private final VespaConfiguration configuration;
+ private final Properties properties;
+ private final String queryTemplate;
+ private final String querySchema;
+ private final String queryRootNode;
+
+ private VespaHttpClient httpClient;
+
+ public VespaQuery() {
+ this(new String[0]);
+ }
+
+ public VespaQuery(String... params) {
+ configuration = VespaConfiguration.get(UDFContext.getUDFContext().getJobConf(), null);
+ properties = VespaConfiguration.loadProperties(params);
+
+ queryTemplate = properties.getProperty(PROPERTY_QUERY_TEMPLATE);
+ if (queryTemplate == null || queryTemplate.isEmpty()) {
+ throw new IllegalArgumentException("Query template cannot be empty");
+ }
+
+ querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray");
+ queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children");
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0) {
+ return null;
+ }
+ JsonNode jsonResult = queryVespa(input);
+ if (jsonResult == null) {
+ return null;
+ }
+ return createPigRepresentation(jsonResult);
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return VespaQuerySchema.getPigSchema(querySchema);
+ }
+
+
+ private JsonNode queryVespa(Tuple input) throws IOException {
+ String url = createVespaQueryUrl(input);
+ if (url == null) {
+ return null;
+ }
+ String result = executeVespaQuery(url);
+ return parseVespaResultJson(result);
+ }
+
+
+ private String createVespaQueryUrl(Tuple input) throws IOException {
+ return TupleTools.toString(getInputSchema(), input, queryTemplate);
+ }
+
+
+ private String executeVespaQuery(String url) throws IOException {
+ if (httpClient == null) {
+ httpClient = new VespaHttpClient(configuration);
+ }
+ return httpClient.get(url);
+ }
+
+ private JsonNode parseVespaResultJson(String result) throws IOException {
+ return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode);
+ }
+
+ private DataBag createPigRepresentation(JsonNode hits) {
+ DataBag bag = new SortedDataBag(null);
+ VespaQuerySchema querySchema = new VespaQuerySchema(this.querySchema);
+
+ for (int rank = 0; rank < hits.size(); ++rank) {
+ JsonNode hit = hits.get(rank);
+ Tuple tuple = querySchema.buildTuple(rank, hit);
+ bag.add(tuple);
+ }
+
+ return bag;
+ }
+
+
+
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
new file mode 100644
index 00000000000..66f04be657f
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
@@ -0,0 +1,62 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.VespaSimpleJsonInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import java.io.IOException;
+
+/**
+ * Simple JSON loader which loads either one JSON object per line or a
+ * multiline JSON consisting of objects in an array.
+ *
+ * Returns only the textual representation of the JSON object.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaSimpleJsonLoader extends LoadFunc {
+
+ private TupleFactory tupleFactory = TupleFactory.getInstance();
+ private VespaSimpleJsonInputFormat.VespaJsonRecordReader recordReader;
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return new VespaSimpleJsonInputFormat();
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+ recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ boolean done = recordReader.nextKeyValue();
+ if (done) {
+ return null;
+ }
+ Text json = recordReader.getCurrentKey();
+ if (json == null) {
+ return null;
+ }
+ return tupleFactory.newTuple(json.toString());
+
+ } catch (InterruptedException ex) {
+ return null;
+ }
+ }
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
new file mode 100644
index 00000000000..d5000b2b328
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
@@ -0,0 +1,187 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+import java.io.*;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A small Pig UDF wrapper around the Vespa http client for
+ * feeding data into a Vespa endpoint.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaStorage extends StoreFunc {
+
+ private final boolean createDocOp;
+ private final String template;
+ private final VespaDocumentOperation.Operation operation;
+
+ private String signature = null;
+ private RecordWriter recordWriter = null;
+ private ResourceSchema resourceSchema = null;
+ private Properties properties = new Properties();
+
+ private static final String PROPERTY_CREATE_DOC_OP = "create-document-operation";
+ private static final String PROPERTY_ID_TEMPLATE = "docid";
+ private static final String PROPERTY_OPERATION = "operation";
+ private static final String PROPERTY_RESOURCE_SCHEMA = "resource_schema";
+
+ public VespaStorage() {
+ createDocOp = false;
+ template = null;
+ operation = null;
+ }
+
+ public VespaStorage(String... params) {
+ properties = VespaConfiguration.loadProperties(params);
+ createDocOp = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_DOC_OP, "false"));
+ operation = VespaDocumentOperation.Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
+ template = properties.getProperty(PROPERTY_ID_TEMPLATE);
+ }
+
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new VespaOutputFormat(properties);
+ }
+
+
+ @Override
+ public void setStoreLocation(String endpoint, Job job) throws IOException {
+ properties.setProperty(VespaConfiguration.ENDPOINT, endpoint);
+ }
+
+
+ @Override
+ public void prepareToWrite(RecordWriter recordWriter) throws IOException {
+ this.recordWriter = recordWriter;
+ this.resourceSchema = getResourceSchema();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return;
+ }
+
+ String data = null;
+ if (createDocOp) {
+ data = createDocumentOperation(tuple);
+ } else if (!tuple.isNull(0)) {
+ data = tuple.get(0).toString(); // assume single field with correctly formatted doc op.
+ }
+
+ if (data == null || data.length() == 0) {
+ return;
+ }
+
+ try {
+ recordWriter.write(0, data);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+ setResourceSchema(resourceSchema);
+ }
+
+
+ @Override
+ public String relToAbsPathForStoreLocation(String endpoint, Path path) throws IOException {
+ return endpoint;
+ }
+
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String s) {
+ this.signature = s;
+ }
+
+
+ @Override
+ public void cleanupOnFailure(String s, Job job) throws IOException {
+ }
+
+
+ @Override
+ public void cleanupOnSuccess(String s, Job job) throws IOException {
+ }
+
+
+ private ResourceSchema getResourceSchema() throws IOException {
+ Properties properties = getUDFProperties();
+ return base64Deserialize(properties.getProperty(PROPERTY_RESOURCE_SCHEMA));
+ }
+
+
+ private void setResourceSchema(ResourceSchema schema) throws IOException {
+ Properties properties = getUDFProperties();
+ if (properties.getProperty(PROPERTY_RESOURCE_SCHEMA) == null) {
+ properties.setProperty(PROPERTY_RESOURCE_SCHEMA, base64Serialize(schema));
+ }
+ }
+
+
+ private Properties getUDFProperties() {
+ String[] context = { signature };
+ return UDFContext.getUDFContext().getUDFProperties(getClass(), context);
+ }
+
+
+ private String createDocumentOperation(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return null;
+ }
+ if (resourceSchema == null) {
+ return null;
+ }
+
+ Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple);
+ String docId = TupleTools.toString(fields, template);
+
+ return VespaDocumentOperation.create(operation, docId, fields, properties);
+ }
+
+
+ public static String base64Serialize(Object o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(o);
+ }
+ return Base64.getEncoder().encodeToString(baos.toByteArray());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Serializable> T base64Deserialize(String s) throws IOException {
+ Object ret;
+ byte[] data = Base64.getDecoder().decode(s);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ ret = ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return (T) ret;
+ }
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
new file mode 100644
index 00000000000..a79f63a77ce
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
@@ -0,0 +1,197 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.test.PathUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MapReduceTest {
+
+ protected static File hdfsBaseDir;
+ protected static FileSystem hdfs;
+ protected static Configuration conf;
+ protected static MiniDFSCluster cluster;
+
+ protected static Path metricsJsonPath;
+ protected static Path metricsCsvPath;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath());
+
+ conf = new HdfsConfiguration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath());
+ conf.set(VespaConfiguration.DRYRUN, "true");
+ conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun");
+
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ hdfs = FileSystem.get(conf);
+
+ metricsJsonPath = new Path("metrics_json");
+ metricsCsvPath = new Path("metrics_csv");
+ copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data");
+ copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data");
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ Path testDir = new Path(hdfsBaseDir.getParent());
+ hdfs.delete(testDir, true);
+ cluster.shutdown();
+ LocalFileSystem localFileSystem = FileSystem.getLocal(conf);
+ localFileSystem.delete(testDir, true);
+ }
+
+ @Test
+ public void requireThatMapOnlyJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(FeedMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(job, metricsJsonPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+
+ @Test
+ public void requireThatMapReduceJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(FeedMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(FeedReducer.class);
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, metricsJsonPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+
+
+ @Test
+ public void requireThatTransformMapJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(ParsingMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(FeedReducer.class);
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, metricsCsvPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ assertEquals(0, counters.getDocumentsSkipped());
+ }
+
+
+ private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException {
+ Path hdfsPath = new Path(hdfsDir, hdfsName);
+ FSDataOutputStream out = hdfs.create(hdfsPath);
+
+ try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
+ int len;
+ byte[] buffer = new byte[1024];
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> {
+ public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String line = value.toString();
+ if (line == null || line.length() == 0)
+ return;
+
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ long date = Long.parseLong(tokenizer.nextToken());
+ String metricName = tokenizer.nextToken();
+ long metricValue = Long.parseLong(tokenizer.nextToken());
+ String application = tokenizer.nextToken();
+
+ String docid = "id:"+application+":metric::"+metricName+"-"+date;
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+
+ g.writeStartObject();
+ g.writeObjectFieldStart("fields");
+ g.writeNumberField("date", date);
+ g.writeStringField("name", metricName);
+ g.writeNumberField("value", metricValue);
+ g.writeStringField("application", application);
+ g.writeEndObject();
+ g.writeStringField("put", docid);
+ g.writeEndObject();
+ g.close();
+
+ context.write(key, new Text(out.toString()));
+ }
+ }
+
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
new file mode 100644
index 00000000000..0e1a8fba17c
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -0,0 +1,274 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+@SuppressWarnings("serial")
+public class VespaDocumentOperationTest {
+
+ @Test
+ public void requireThatUDFReturnsCorrectJson() throws Exception {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ // operation put is default
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("put").getTextValue());
+ assertEquals("testapp", fields.get("application").getTextValue());
+ assertEquals("clicks", fields.get("name").getTextValue());
+ assertEquals(3, fields.get("value").getIntValue());
+ }
+
+
+ @Test
+ public void requireThatUDFSupportsUpdateAssign() throws IOException {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").getTextValue());
+ assertEquals("testapp", fields.get("application").get("assign").getTextValue());
+ assertEquals("clicks", fields.get("name").get("assign").getTextValue());
+ assertEquals(3, fields.get("value").get("assign").getIntValue());
+ }
+
+
+ @Test
+ public void requireThatUDFReturnsNullForMissingConfig() throws Exception {
+ String json = getDocumentOperationJson();
+ assertNull(json);
+ }
+
+
+ @Test
+ public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception {
+ String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("remove").getTextValue());
+ assertNull(fields);
+ }
+
+
+ @Test
+ public void requireThatUDFGeneratesComplexDataTypes() throws Exception {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ Tuple intTuple = TupleFactory.getInstance().newTuple();
+ int[] intArray = {1, 2, 3};
+ for (int i : intArray) { intTuple.append(i); }
+
+ Tuple stringTuple = TupleFactory.getInstance().newTuple();
+ String[] stringArray = {"a", "b", "c"};
+ for (String s : stringArray) { stringTuple.append(s); }
+
+ DataBag bag = new SortedDataBag(null);
+ bag.add(intTuple);
+ bag.add(stringTuple);
+
+ Map<String, Object> innerMap = new HashMap<String, Object>() {{
+ put("a", "string");
+ put("tuple", intTuple);
+ }};
+
+ DataByteArray bytes = new DataByteArray("testdata".getBytes());
+
+ Map<String, Object> outerMap = new HashMap<String, Object>() {{
+ put("string", "value");
+ put("int", 3);
+ put("float", 3.145);
+ put("bool", true);
+ put("byte", bytes);
+ put("map", innerMap);
+ put("bag", bag);
+ }};
+
+ addToTuple("map", DataType.MAP, outerMap, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode map = fields.get("map");
+
+ assertEquals("value", map.get("string").getTextValue());
+ assertEquals(3, map.get("int").getIntValue());
+ assertEquals(3.145, map.get("float").getDoubleValue(), 1e-6);
+ assertEquals(true, map.get("bool").getBooleanValue());
+ assertEquals("dGVzdGRhdGE=", map.get("byte").getTextValue());
+
+ assertEquals("string", map.get("map").get("a").getTextValue());
+ for (int i = 0; i < intArray.length; ++i) {
+ assertEquals(intArray[i], map.get("map").get("tuple").get(i).asInt());
+ }
+
+ JsonNode bagField = map.get("bag");
+ for (int i = 0; i < intArray.length; ++i) {
+ assertEquals(intArray[i], bagField.get(0).get(i).asInt());
+ }
+ for (int i = 0; i < stringArray.length; ++i) {
+ assertEquals(stringArray[i], bagField.get(1).get(i).asText());
+ }
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysMustBeConfigured() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty"); // simple arrays not configured
+ // json: [["a"], ["b"], ["c"]]
+ assertEquals("a", array.get(0).get(0).asText());
+ assertEquals("b", array.get(1).get(0).asText());
+ assertEquals("c", array.get(2).get(0).asText());
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysAreSupported() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=array");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysCanBeConfiguredWithWildcard() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=*");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ @Test
+ public void requireThatMultipleSimpleArraysAreSupported() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=empty,array");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ private JsonNode setupSimpleArrayOperation(String name, String[] array, String... params) throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ DataBag bag = new SortedDataBag(null);
+ for (String s : array) {
+ Tuple stringTuple = TupleFactory.getInstance().newTuple();
+ stringTuple.append(s);
+ bag.add(stringTuple);
+ }
+ addToTuple(name, DataType.BAG, bag, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation(params);
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ return fields.get(name);
+ }
+
+
+ @Test
+ public void requireThatUDFSupportsTensors() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ // Please refer to the tensor format documentation
+
+ Map<String, Double> tensor = new HashMap<String, Double>() {{
+ put("x:label1,y:label2,z:label4", 2.0);
+ put("x:label3", 3.0);
+ }};
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode tensorNode = fields.get("tensor");
+ JsonNode cells = tensorNode.get("cells");
+
+ assertEquals("label1", cells.get(0).get("address").get("x").asText());
+ assertEquals("label2", cells.get(0).get("address").get("y").asText());
+ assertEquals("label4", cells.get(0).get("address").get("z").asText());
+ assertEquals("label3", cells.get(1).get("address").get("x").asText());
+
+ assertEquals(2.0, cells.get(0).get("value").asDouble(), 1e-6);
+ assertEquals(3.0, cells.get(1).get("value").asDouble(), 1e-6);
+ }
+
+
+ @Test
+ public void requireThatUDFCanExcludeFields() throws IOException {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "exclude-fields=application,date");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ // 'application' and 'date' fields should not appear in JSON
+ assertNull(fields.get("application"));
+ assertNull(fields.get("date"));
+ assertNotNull(fields.get("name"));
+ assertNotNull(fields.get("value"));
+ }
+
+
+ private String getDocumentOperationJson(String... params) throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("application", DataType.CHARARRAY, "testapp", schema, tuple);
+ addToTuple("name", DataType.CHARARRAY, "clicks", schema, tuple);
+ addToTuple("date", DataType.CHARARRAY, "20160112", schema, tuple);
+ addToTuple("value", DataType.CHARARRAY, 3, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation(params);
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ return json;
+ }
+
+
+ private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
+ schema.add(new Schema.FieldSchema(alias, type));
+ tuple.append(value);
+ }
+
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
new file mode 100644
index 00000000000..0f123904cfb
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
@@ -0,0 +1,96 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.sun.net.httpserver.HttpServer;
+import com.yahoo.vespa.hadoop.util.MockQueryHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class VespaQueryTest {
+
+ @Test
+ public void requireThatQueriesAreReturnedCorrectly() throws Exception {
+ runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901);
+ }
+
+ @Test
+ public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception {
+ runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902);
+ }
+
+ private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception {
+ final String endpoint = "http://localhost:" + port;
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+ server.createContext("/", queryHandler);
+ server.start();
+
+ PigServer ps = setup(script, endpoint);
+
+ Iterator<Tuple> recommendations = ps.openIterator("recommendations");
+ while (recommendations.hasNext()) {
+ Tuple tuple = recommendations.next();
+
+ String userid = (String) tuple.get(0);
+ Integer rank = (Integer) tuple.get(1);
+ String docid = (String) tuple.get(2);
+ Double relevance = (Double) tuple.get(3);
+ String fieldId = (String) tuple.get(4);
+ String fieldContent = (String) tuple.get(5);
+
+ MockQueryHandler.MockQueryHit hit = queryHandler.getHit(userid, rank);
+ assertEquals(docid, hit.id);
+ assertEquals(relevance, hit.relevance, 1e-3);
+ assertEquals(fieldId, hit.fieldId);
+ assertEquals(fieldContent, hit.fieldContent);
+ }
+
+ if (server != null) {
+ server.stop(0);
+ }
+
+ }
+
+ private PigServer setup(String script, String endpoint) throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("ENDPOINT", endpoint);
+
+ PigServer ps = new PigServer(ExecType.LOCAL, conf);
+ ps.setBatchOn();
+ ps.registerScript(script, parameters);
+
+ return ps;
+ }
+
+ private MockQueryHandler createQueryHandler(String childNode) {
+ MockQueryHandler queryHandler = new MockQueryHandler(childNode);
+
+ List<String> userIds = Arrays.asList("5", "104", "313");
+
+ int hitsPerUser = 3;
+ for (int i = 0; i < hitsPerUser * userIds.size(); ++i) {
+ String id = "" + (i+1);
+ String userId = userIds.get(i / hitsPerUser);
+ queryHandler.newHit().
+ setId("id::::" + id).
+ setRelevance(1.0 - (i % hitsPerUser) * 0.1).
+ setFieldSddocname("doctype").
+ setFieldId("" + id).
+ setFieldDate("2016060" + id).
+ setFieldContent("Content for user " + userId + " hit " + i % hitsPerUser + "...").
+ add(userId);
+ }
+
+ return queryHandler;
+ }
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
new file mode 100644
index 00000000000..322c729b8c5
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -0,0 +1,110 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
+import org.junit.Test;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+
+
+public class VespaStorageTest {
+
+ @Test
+ public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(VespaConfiguration.DATA_FORMAT, "xml");
+ assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf);
+ }
+
+
+ @Test
+ public void requireThatPremadeOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
+ }
+
+
+ @Test
+ public void requireThatCreateOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_create_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig");
+ }
+
+
+ @Test
+ public void requireThatFeedVisitDataSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_visit_data.pig");
+ }
+
+
+ private PigServer setup(String script, Configuration conf) throws Exception {
+ if (conf == null) {
+ conf = new HdfsConfiguration();
+ }
+ conf.setIfUnset(VespaConfiguration.DRYRUN, "true");
+ conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint");
+
+ // Parameter substitutions - can also be set by configuration
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("ENDPOINT", "endpoint-does-not-matter-in-dryrun,another-endpoint-that-does-not-matter");
+
+ PigServer ps = new PigServer(ExecType.LOCAL, conf);
+ ps.setBatchOn();
+ ps.registerScript(script, parameters);
+
+ return ps;
+ }
+
+
+ private void assertAllDocumentsOk(String script) throws Exception {
+ assertAllDocumentsOk(script, null);
+ }
+
+
+ private void assertAllDocumentsOk(String script, Configuration conf) throws Exception {
+ PigServer ps = setup(script, conf);
+ List<ExecJob> jobs = ps.executeBatch();
+ PigStats stats = jobs.get(0).getStatistics();
+ for (JobStats js : stats.getJobGraph()) {
+ Counters hadoopCounters = ((MRJobStats)js).getHadoopCounters();
+ assertNotNull(hadoopCounters);
+ VespaCounters counters = VespaCounters.get(hadoopCounters);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+ }
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
new file mode 100644
index 00000000000..0bf9f6b447e
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
@@ -0,0 +1,218 @@
+package com.yahoo.vespa.hadoop.util;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockQueryHandler implements HttpHandler {
+
+ private final Map<String, List<MockQueryHit>> hitMap;
+ private final String childNode;
+
+ public MockQueryHandler(String childNode) {
+ this.hitMap = new HashMap<>();
+ this.childNode = childNode;
+ }
+
+ public void handle(HttpExchange t) throws IOException {
+ URI uri = t.getRequestURI();
+ String query = uri.getQuery();
+ String response = null;
+
+ // Parse query - extract "query" element
+ if (query != null) {
+ String params[] = query.split("[&]");
+ for (String param : params) {
+ int i = param.indexOf('=');
+ String name = param.substring(0, i);
+ String value = URLDecoder.decode(param.substring(i + 1), "UTF-8");
+
+ if ("query".equalsIgnoreCase(name)) {
+ response = getResponse(URLDecoder.decode(param.substring(i + 1), "UTF-8"));
+ }
+ }
+ }
+
+ t.sendResponseHeaders(200, response == null ? 0 : response.length());
+ OutputStream os = t.getResponseBody();
+ os.write(response == null ? "".getBytes() : response.getBytes());
+ os.close();
+
+ }
+
+ public MockQueryHit getHit(String query, Integer rank) {
+ if (!hitMap.containsKey(query)) {
+ return null;
+ }
+ if (rank >= hitMap.get(query).size()) {
+ return null;
+ }
+ return hitMap.get(query).get(rank);
+ }
+
+ public MockQueryHit newHit() {
+ return new MockQueryHit(this);
+ }
+
+ public void addHit(String query, MockQueryHit hit) {
+ if (!hitMap.containsKey(query)) {
+ hitMap.put(query, new ArrayList<>());
+ }
+ hitMap.get(query).add(hit);
+ }
+
+ private String getResponse(String query) throws IOException {
+ List<MockQueryHit> hits = hitMap.get(query);
+ if (hits == null) {
+ return null;
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+
+ writeResultStart(g, hits.size());
+ for (MockQueryHit hit : hits) {
+ writeHit(g, hit);
+ }
+ writeResultsEnd(g);
+ g.close();
+
+ return out.toString();
+ }
+
+ private void writeHit(JsonGenerator g, MockQueryHit hit) throws IOException {
+ g.writeStartObject();
+
+ g.writeFieldName("id");
+ g.writeString(hit.id);
+
+ g.writeFieldName("relevance");
+ g.writeNumber(hit.relevance);
+
+ g.writeFieldName("fields");
+ g.writeStartObject();
+
+ g.writeFieldName("sddocname");
+ g.writeString(hit.fieldSddocname);
+
+ g.writeFieldName("date");
+ g.writeString(hit.fieldDate);
+
+ g.writeFieldName("content");
+ g.writeString(hit.fieldContent);
+
+ g.writeFieldName("id");
+ g.writeString(hit.fieldId);
+
+ g.writeEndObject();
+ g.writeEndObject();
+ }
+
+ private void writeResultStart(JsonGenerator g, int count) throws IOException {
+ g.writeStartObject();
+ g.writeFieldName("root");
+
+ g.writeStartObject();
+
+ g.writeFieldName("id");
+ g.writeString("toplevel");
+
+ g.writeFieldName("relevance");
+ g.writeNumber(1);
+
+ g.writeFieldName("fields");
+ g.writeStartObject();
+ g.writeFieldName("totalCount");
+ g.writeNumber(count);
+ g.writeEndObject();
+
+ g.writeFieldName("coverage");
+ g.writeStartObject();
+ g.writeFieldName("coverage");
+ g.writeNumber(100);
+ // ... more stuff here usually
+ g.writeEndObject();
+
+ g.writeFieldName("children");
+ g.writeStartArray();
+
+ if (!childNode.isEmpty()) {
+ g.writeStartObject();
+ g.writeFieldName(childNode);
+ g.writeStartArray();
+ }
+ }
+
+ private void writeResultsEnd(JsonGenerator g) throws IOException {
+ if (!childNode.isEmpty()) {
+ g.writeEndArray();
+ g.writeEndObject();
+ }
+ g.writeEndArray();
+ g.writeEndObject();
+ g.writeEndObject();
+ }
+
+ public static class MockQueryHit {
+
+ private final MockQueryHandler handler;
+
+ public String id;
+ public Double relevance;
+ public String fieldSddocname;
+ public String fieldDate;
+ public String fieldContent;
+ public String fieldId;
+
+ private MockQueryHit(MockQueryHandler handler) {
+ this.handler = handler;
+ }
+
+ public void add(String query) {
+ handler.addHit(query, this);
+ }
+
+ public MockQueryHit setId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public MockQueryHit setRelevance(Double relevance) {
+ this.relevance = relevance;
+ return this;
+ }
+
+ public MockQueryHit setFieldSddocname(String fieldSddocname) {
+ this.fieldSddocname = fieldSddocname;
+ return this;
+ }
+
+ public MockQueryHit setFieldDate(String fieldDate) {
+ this.fieldDate = fieldDate;
+ return this;
+ }
+
+ public MockQueryHit setFieldContent(String fieldContent) {
+ this.fieldContent = fieldContent;
+ return this;
+ }
+
+ public MockQueryHit setFieldId(String fieldId) {
+ this.fieldId = fieldId;
+ return this;
+ }
+ }
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
new file mode 100644
index 00000000000..c98e7b1c02c
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
@@ -0,0 +1,50 @@
+package com.yahoo.vespa.hadoop.util;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TupleToolsTest {
+
+ @Test
+ public void requireThatTupleToStringHandlesSimpleTypes() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
+
+ String template = "Id is <id> and rank is <rank>";
+ String result = TupleTools.toString(schema, tuple, template);
+
+ assertEquals("Id is 123 and rank is 1", result);
+ }
+
+
+ private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
+ schema.add(new Schema.FieldSchema(alias, type));
+ tuple.append(value);
+ }
+
+ @Test
+ public void requireThatTupleToStringHandlesStringCharacters() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple);
+ addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
+
+ String template = "Id is <id> and rank is <rank>";
+ String result = TupleTools.toString(schema, tuple, template);
+
+ assertEquals("Id is _!@#$%^&*() and rank is 1", result);
+ }
+
+}
diff --git a/vespa-hadoop/src/test/pig/feed_create_operations.pig b/vespa-hadoop/src/test/pig/feed_create_operations.pig
new file mode 100644
index 00000000000..2186935b59a
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/feed_create_operations.pig
@@ -0,0 +1,23 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Create valid Vespa put operations
+DEFINE VespaPutOperation
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:<application>:metrics::<name>-<date>'
+ );
+
+-- By default, VespaStorage assumes it's feeding valid Vespa operations
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load tabular data
+metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray);
+
+-- Transform tabular data to a Vespa document operation JSON format
+metrics = FOREACH metrics GENERATE VespaPutOperation(*);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
+
diff --git a/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig b/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig
new file mode 100644
index 00000000000..348616f00ad
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig
@@ -0,0 +1,18 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Transform tabular data to a Vespa document operation JSON format
+-- as part of storing the data.
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage(
+ 'create-document-operation=true',
+ 'operation=put',
+ 'docid=id:<application>:metrics::<name>-<date>'
+ );
+
+-- Load tabular data
+metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
+
diff --git a/vespa-hadoop/src/test/pig/feed_multiline_operations.pig b/vespa-hadoop/src/test/pig/feed_multiline_operations.pig
new file mode 100644
index 00000000000..e9efb36858b
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/feed_multiline_operations.pig
@@ -0,0 +1,14 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaJsonLoader
+DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader();
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/operations_multiline_data.json' USING VespaJsonLoader() AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
diff --git a/vespa-hadoop/src/test/pig/feed_operations.pig b/vespa-hadoop/src/test/pig/feed_operations.pig
new file mode 100644
index 00000000000..327181d4410
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/feed_operations.pig
@@ -0,0 +1,10 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/operations_data.json' AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage(); \ No newline at end of file
diff --git a/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig b/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig
new file mode 100644
index 00000000000..6d31201e4eb
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig
@@ -0,0 +1,13 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaJsonLoader
+DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader();
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/operations_data.json' USING VespaJsonLoader() AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage(); \ No newline at end of file
diff --git a/vespa-hadoop/src/test/pig/feed_operations_xml.pig b/vespa-hadoop/src/test/pig/feed_operations_xml.pig
new file mode 100644
index 00000000000..d109d56ad1e
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/feed_operations_xml.pig
@@ -0,0 +1,10 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for xml data
+data = LOAD 'src/test/resources/operations_data.xml' AS (data:chararray);
+
+-- Store into Vespa
+STORE data INTO '$ENDPOINT' USING VespaStorage(); \ No newline at end of file
diff --git a/vespa-hadoop/src/test/pig/feed_visit_data.pig b/vespa-hadoop/src/test/pig/feed_visit_data.pig
new file mode 100644
index 00000000000..14010c38336
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/feed_visit_data.pig
@@ -0,0 +1,11 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/visit_data.json' AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
diff --git a/vespa-hadoop/src/test/pig/query.pig b/vespa-hadoop/src/test/pig/query.pig
new file mode 100644
index 00000000000..70f53a992e2
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/query.pig
@@ -0,0 +1,18 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define Vespa query for retrieving blog posts
+DEFINE BlogPostRecommendations
+ com.yahoo.vespa.hadoop.pig.VespaQuery(
+ 'query=$ENDPOINT/search?query=<userid>&hits=100',
+ 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray'
+ );
+
+-- Load data from a local file
+users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray);
+users = FILTER users BY userid IS NOT null;
+
+-- Run a set of queries against Vespa
+recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*));
+
+-- Output recommendations
+DUMP recommendations;
diff --git a/vespa-hadoop/src/test/pig/query_alt_root.pig b/vespa-hadoop/src/test/pig/query_alt_root.pig
new file mode 100644
index 00000000000..8995990e398
--- /dev/null
+++ b/vespa-hadoop/src/test/pig/query_alt_root.pig
@@ -0,0 +1,19 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define Vespa query for retrieving blog posts
+DEFINE BlogPostRecommendations
+ com.yahoo.vespa.hadoop.pig.VespaQuery(
+ 'query=$ENDPOINT/search?query=<userid>&hits=100',
+ 'rootnode=root/children/children',
+ 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray'
+ );
+
+-- Load data from a local file
+users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray);
+users = FILTER users BY userid IS NOT null;
+
+-- Run a set of queries against Vespa
+recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*));
+
+-- Output recommendations
+DUMP recommendations;
diff --git a/vespa-hadoop/src/test/resources/operations_data.json b/vespa-hadoop/src/test/resources/operations_data.json
new file mode 100644
index 00000000000..5af436dbfe7
--- /dev/null
+++ b/vespa-hadoop/src/test/resources/operations_data.json
@@ -0,0 +1,10 @@
+{"put":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}}
+{"fields":{"date":"2015110416","name":"clicks","value":5,"application":"testapp"},"put":"id:testapp:metric::clicks-2015110416"}
+{"put":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}}
+{"fields":{"date":"2015110422","name":"clicks","value":5,"application":"testapp"},"condition":"metrics==0","put":"id:testapp:metric::clicks-2015110422"}
+{"put":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}}
diff --git a/vespa-hadoop/src/test/resources/operations_data.xml b/vespa-hadoop/src/test/resources/operations_data.xml
new file mode 100644
index 00000000000..cdf1ca78c1d
--- /dev/null
+++ b/vespa-hadoop/src/test/resources/operations_data.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="utf-8"?>
+<vespafeed>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/a-ha/Scoundrel+Days"> <url>http://music.yahoo.com/a-ha/Scoundrel+Days</url> <title><![CDATA[Scoundrel Days]]></title> <artist><![CDATA[a-ha]]></artist> <year>0</year> <popularity>290</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Restless+And+Wild"> <url>http://music.yahoo.com/Accept/Restless+And+Wild</url> <title><![CDATA[Restless And Wild]]></title> <artist><![CDATA[Accept]]></artist> <year>0</year> <popularity>75</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Staying+A+Life"> <url>http://music.yahoo.com/Accept/Staying+A+Life</url> <title><![CDATA[Staying A Life]]></title> <artist><![CDATA[Accept]]></artist> <year>1985</year> <popularity>77</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Dirt"> <url>http://music.yahoo.com/Alice+In+Chains/Dirt</url> <title><![CDATA[Dirt]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1992</year> <popularity>114</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Live"> <url>http://music.yahoo.com/Alice+In+Chains/Live</url> <title><![CDATA[Live]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1990</year> <popularity>363</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life"> <url>http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life</url> <title><![CDATA[This Is The Life]]></title> <artist><![CDATA[Amy MacDonald]]></artist> <year>2007</year> <popularity>355</popularity> </document>
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Ane+Brun/Duets"> <url>http://music.yahoo.com/Ane+Brun/Duets</url> <title><![CDATA[Duets]]></title> <artist><![CDATA[Ane Brun]]></artist> <year>0</year> <popularity>255</popularity> </document>
+ <update documenttype="music" documentid="id:music:music::http://music.yahoo.com/bobdylan/BestOf"><assign field="title">The Best of Bob Dylan</assign><add field="tracks"><item>Man Of Constant Sorrow</item></add></update>
+ <remove documentid="id:music:music::http://music.yahoo.com/Aqpop/Beautifully+Smart" />
+ <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Annuals/Be+He+Me"> <url>http://music.yahoo.com/Annuals/Be+He+Me</url> <title><![CDATA[Be He Me]]></title> <artist><![CDATA[Annuals]]></artist> <year>0</year> <popularity>207</popularity> </document>
+</vespafeed>
diff --git a/vespa-hadoop/src/test/resources/operations_multiline_data.json b/vespa-hadoop/src/test/resources/operations_multiline_data.json
new file mode 100644
index 00000000000..2b51698d9b7
--- /dev/null
+++ b/vespa-hadoop/src/test/resources/operations_multiline_data.json
@@ -0,0 +1,93 @@
+[
+ {
+ "put": "id:testapp:metric::clicks-2015110414",
+ "fields": {
+ "date": "2015110414",
+ "name": "clicks",
+ "value": 1,
+ "application": "testapp"
+ }
+ },
+ {
+ "fields": {
+ "date": "2015110416",
+ "name": "clicks",
+ "value": 5,
+ "application": "testapp"
+ },
+ "put": "id:testapp:metric::clicks-2015110416"
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110415",
+ "fields": {
+ "date": "2015110415",
+ "name": "clicks",
+ "value": 2,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110417",
+ "fields": {
+ "date": "2015110417",
+ "name": "clicks",
+ "value": 3,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110418",
+ "fields": {
+ "date": "2015110418",
+ "name": "clicks",
+ "value": 6,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110419",
+ "fields": {
+ "date": "2015110419",
+ "name": "clicks",
+ "value": 3,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110420",
+ "fields": {
+ "date": "2015110420",
+ "name": "clicks",
+ "value": 4,
+ "application": "testapp"
+ }
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110421",
+ "fields": {
+ "date": "2015110421",
+ "name": "clicks",
+ "value": 2,
+ "application": "testapp"
+ }
+ },
+ {
+ "fields": {
+ "date": "2015110422",
+ "name": "clicks",
+ "value": 5,
+ "application": "testapp"
+ },
+ "condition": "metrics==0",
+ "put": "id:testapp:metric::clicks-2015110422"
+ },
+ {
+ "put": "id:testapp:metric::clicks-2015110423",
+ "fields": {
+ "date": "2015110423",
+ "name": "clicks",
+ "value": 1,
+ "application": "testapp"
+ }
+ }
+]
diff --git a/vespa-hadoop/src/test/resources/tabular_data.csv b/vespa-hadoop/src/test/resources/tabular_data.csv
new file mode 100644
index 00000000000..541597998e9
--- /dev/null
+++ b/vespa-hadoop/src/test/resources/tabular_data.csv
@@ -0,0 +1,11 @@
+2015110414 clicks 1 testapp
+2015110415 clicks 2 testapp
+2015110416 clicks 5 testapp
+2015110417 clicks 3 testapp
+2015110418 clicks 6 testapp
+2015110419 clicks 3 testapp
+2015110420 clicks 4 testapp
+2015110421 clicks 2 testapp
+2015110422 clicks 5 testapp
+2015110423 clicks 1 testapp
+
diff --git a/vespa-hadoop/src/test/resources/user_ids.csv b/vespa-hadoop/src/test/resources/user_ids.csv
new file mode 100644
index 00000000000..5875a3b9a7c
--- /dev/null
+++ b/vespa-hadoop/src/test/resources/user_ids.csv
@@ -0,0 +1,4 @@
+5
+104
+313
+
diff --git a/vespa-hadoop/src/test/resources/visit_data.json b/vespa-hadoop/src/test/resources/visit_data.json
new file mode 100644
index 00000000000..a48fc9cf1c0
--- /dev/null
+++ b/vespa-hadoop/src/test/resources/visit_data.json
@@ -0,0 +1,12 @@
+[
+{"id":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110416","fields":{"date":"2015110416","name":"clicks","value":4,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110422","fields":{"date":"2015110422","name":"clicks","value":7,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}}
+]