aboutsummaryrefslogtreecommitdiffstats
path: root/hadoop/src
diff options
context:
space:
mode:
authorLester Solbakken <lesters@yahoo-inc.com>2016-06-30 15:47:16 +0200
committerLester Solbakken <lesters@yahoo-inc.com>2016-06-30 15:47:16 +0200
commit350b49ff517b7294dffdb41f4235c9055b49aef2 (patch)
tree3a701975fafef034e73c53af04033df75b4c1512 /hadoop/src
parent9d15989bfe2a27b3555f2df1e9f92504f220ff2f (diff)
Add Vespa-Hadoop library
Diffstat (limited to 'hadoop/src')
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java36
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java51
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java172
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java69
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java134
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java104
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java80
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java113
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java335
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java111
-rw-r--r--hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java186
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java196
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java257
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java106
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java80
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java206
-rw-r--r--hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java36
-rw-r--r--hadoop/src/test/pig/feed_create_operations.pig23
-rw-r--r--hadoop/src/test/pig/feed_create_operations_short_form.pig18
-rw-r--r--hadoop/src/test/pig/feed_operations.pig10
-rw-r--r--hadoop/src/test/pig/feed_visit_data.pig11
-rw-r--r--hadoop/src/test/pig/query.pig18
-rw-r--r--hadoop/src/test/resources/operations_data.json10
-rw-r--r--hadoop/src/test/resources/tabular_data.csv11
-rw-r--r--hadoop/src/test/resources/user_ids.csv4
-rw-r--r--hadoop/src/test/resources/visit_data.json12
26 files changed, 2389 insertions, 0 deletions
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
new file mode 100644
index 00000000000..cd0f65496d0
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
@@ -0,0 +1,36 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * The output committer describes the commit task output for a Map-Reduce
+ * job. Not currently used, but is part of the Hadoop protocol since 2.7.
+ *
+ * @author lesters
+ */
+public class VespaOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
new file mode 100644
index 00000000000..5181ef05e2c
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -0,0 +1,51 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * An output specification for writing to Vespa instances in a Map-Reduce job.
+ * Mainly returns an instance of a {@link VespaRecordWriter} that does the
+ * actual feeding to Vespa.
+ *
+ * @author lesters
+ */
+public class VespaOutputFormat extends OutputFormat {
+
+ private final Properties configOverride;
+
+ public VespaOutputFormat() {
+ super();
+ this.configOverride = null;
+ }
+
+ public VespaOutputFormat(Properties configOverride) {
+ super();
+ this.configOverride = configOverride;
+ }
+
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ VespaCounters counters = VespaCounters.get(context);
+ VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
+ return new VespaRecordWriter(configuration, counters);
+ }
+
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ return new VespaOutputCommitter();
+ }
+
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
new file mode 100644
index 00000000000..48b1ab6a05e
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -0,0 +1,172 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedClientFactory;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.*;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.logging.Logger;
+
+/**
+ * VespaRecordWriter sends the output &lt;key, value&gt; to one or more Vespa
+ * endpoints.
+ *
+ * @author lesters
+ */
+public class VespaRecordWriter extends RecordWriter {
+
+ private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
+
+ private boolean initialized = false;
+ private FeedClient feedClient;
+
+ private final VespaCounters counters;
+ private final VespaConfiguration configuration;
+ private final int progressInterval;
+
+
+ VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ this.counters = counters;
+ this.configuration = configuration;
+ this.progressInterval = configuration.progressInterval();
+ }
+
+
+ @Override
+ public void write(Object key, Object data) throws IOException, InterruptedException {
+ if (!initialized) {
+ initialize();
+ }
+
+ // Assumption: json - xml not currently supported
+ String json = data.toString().trim();
+
+ // Parse json to find document id - if none found, skip this write
+ String docId = findDocId(json);
+ if (docId != null && docId.length() >= 0) {
+ feedClient.stream(docId, json);
+ counters.incrementDocumentsSent(1);
+ } else {
+ counters.incrementDocumentsSkipped(1);
+ }
+
+ if (counters.getDocumentsSent() % progressInterval == 0) {
+ String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
+ counters.getDocumentsSent(),
+ counters.getDocumentsOk(),
+ counters.getDocumentsFailed(),
+ counters.getDocumentsSkipped());
+ log.info(progress);
+ }
+
+ }
+
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (feedClient != null) {
+ feedClient.close();
+ }
+ }
+
+
+ private void initialize() {
+ ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
+ connParamsBuilder.setDryRun(configuration.dryrun());
+ connParamsBuilder.setUseCompression(configuration.useCompression());
+ connParamsBuilder.setEnableV3Protocol(configuration.useV3Protocol());
+ connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
+ if (configuration.proxyHost() != null) {
+ connParamsBuilder.setProxyHost(configuration.proxyHost());
+ }
+ if (configuration.proxyPort() >= 0) {
+ connParamsBuilder.setProxyPort(configuration.proxyPort());
+ }
+
+ SessionParams.Builder sessionParams = new SessionParams.Builder();
+ sessionParams.setThrottlerMinSize(configuration.throttlerMinSize());
+ sessionParams.setConnectionParams(connParamsBuilder.build());
+ sessionParams.setFeedParams(new FeedParams.Builder()
+ .setDataFormat(configuration.dataFormat())
+ .build());
+
+ String endpoints = configuration.endpoint();
+ StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String endpoint = tokenizer.nextToken().trim();
+ sessionParams.addCluster(new Cluster.Builder().addEndpoint(
+ Endpoint.create(endpoint, 4080, false)
+ ).build());
+ }
+
+ ResultCallback resultCallback = new ResultCallback(counters);
+ feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
+
+ initialized = true;
+ }
+
+
+ private String findDocId(String json) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try(JsonParser parser = factory.createParser(json)) {
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ return null;
+ }
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ String fieldName = parser.getCurrentName();
+ parser.nextToken();
+ if (VespaDocumentOperation.Operation.valid(fieldName)) {
+ String docId = parser.getText();
+ return docId;
+ } else {
+ parser.skipChildren();
+ }
+ }
+ } catch (JsonParseException ex) {
+ return null;
+ }
+ return null;
+ }
+
+
+ static class ResultCallback implements FeedClient.ResultCallback {
+ final VespaCounters counters;
+
+ public ResultCallback(VespaCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ if (!documentResult.isSuccess()) {
+ counters.incrementDocumentsFailed(1);
+ StringBuilder sb = new StringBuilder();
+ sb.append("Problems with docid ");
+ sb.append(docId);
+ sb.append(": ");
+ List<Result.Detail> details = documentResult.getDetails();
+ for (Result.Detail detail : details) {
+ sb.append(detail.toString());
+ sb.append(" ");
+ }
+ log.warning(sb.toString());
+ return;
+ }
+ counters.incrementDocumentsOk(1);
+ }
+
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
new file mode 100644
index 00000000000..cd692c69ba7
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
@@ -0,0 +1,69 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TupleTools {
+
+ private static final Pattern pattern = Pattern.compile("<([\\w]+)>");
+
+ public static Map<String, Object> tupleMap(Schema schema, Tuple tuple) throws IOException {
+ Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
+ List<Schema.FieldSchema> schemas = schema.getFields();
+ for (int i = 0; i < schemas.size(); i++) {
+ Schema.FieldSchema field = schemas.get(i);
+ String alias = field.alias;
+ Object value = tuple.get(i);
+ if (value != null) {
+ tupleMap.put(alias, value);
+ }
+ }
+ return tupleMap;
+ }
+
+ public static Map<String, Object> tupleMap(ResourceSchema schema, Tuple tuple) throws IOException {
+ Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
+ ResourceSchema.ResourceFieldSchema[] schemas = schema.getFields();
+ for (int i = 0; i < schemas.length; i++) {
+ ResourceSchema.ResourceFieldSchema field = schemas[i];
+ String alias = field.getName();
+ Object value = tuple.get(i);
+ if (value != null) {
+ tupleMap.put(alias, value);
+ }
+ }
+ return tupleMap;
+ }
+
+ public static String toString(Schema schema, Tuple tuple, String template) throws IOException {
+ return toString(tupleMap(schema, tuple), template);
+ }
+
+ public static String toString(Map<String,Object> fields, String template) {
+ if (template == null || template.length() == 0) {
+ return template;
+ }
+ if (fields == null || fields.size() == 0) {
+ return template;
+ }
+
+ Matcher m = pattern.matcher(template);
+ StringBuffer sb = new StringBuffer();
+ while (m.find()) {
+ Object value = fields.get(m.group(1));
+ String replacement = value != null ? value.toString() : m.group(0);
+ m.appendReplacement(sb, replacement);
+ }
+ m.appendTail(sb);
+ return sb.toString();
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
new file mode 100644
index 00000000000..f02e4216a9d
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -0,0 +1,134 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.yahoo.vespa.http.client.config.FeedParams;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+
+public class VespaConfiguration {
+
+ public static final String ENDPOINT = "vespa.feed.endpoint";
+ public static final String PROXY_HOST = "vespa.feed.proxy.host";
+ public static final String PROXY_PORT = "vespa.feed.proxy.port";
+ public static final String DRYRUN = "vespa.feed.dryrun";
+ public static final String USE_COMPRESSION = "vespa.feed.usecompression";
+ public static final String DATA_FORMAT = "vespa.feed.data.format";
+ public static final String PROGRESS_REPORT = "vespa.feed.progress.interval";
+ public static final String V3_PROTOCOL = "vespa.feed.v3.protocol";
+ public static final String CONNECTIONS = "vespa.feed.connections";
+ public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size";
+ public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout";
+
+ private final Configuration conf;
+ private final Properties override;
+
+ private VespaConfiguration(Configuration conf, Properties override) {
+ this.conf = conf;
+ this.override = override;
+ }
+
+
+ public static VespaConfiguration get(Configuration conf, Properties override) {
+ return new VespaConfiguration(conf, override);
+ }
+
+
+ public String endpoint() {
+ return getString(ENDPOINT);
+ }
+
+
+ public String proxyHost() {
+ return getString(PROXY_HOST);
+ }
+
+
+ public int proxyPort() {
+ return getInt(PROXY_PORT, 4080);
+ }
+
+
+ public boolean dryrun() {
+ return getBoolean(DRYRUN, false);
+ }
+
+
+ public boolean useCompression() {
+ return getBoolean(USE_COMPRESSION, true);
+ }
+
+
+ public boolean useV3Protocol() {
+ return getBoolean(V3_PROTOCOL, false);
+ }
+
+
+ public int numConnections() {
+ return getInt(CONNECTIONS, 8);
+ }
+
+
+ public int throttlerMinSize() {
+ return getInt(THROTTLER_MIN_SIZE, 0);
+ }
+
+
+ public int queryConnectionTimeout() {
+ return getInt(QUERY_CONNECTION_TIMEOUT, 10000);
+ }
+
+
+ public FeedParams.DataFormat dataFormat() {
+ String format = getString(DATA_FORMAT);
+ if ("xml".equalsIgnoreCase(format)) {
+ return FeedParams.DataFormat.XML_UTF8;
+ }
+ return FeedParams.DataFormat.JSON_UTF8;
+ }
+
+ public int progressInterval() {
+ return getInt(PROGRESS_REPORT, 1000);
+ }
+
+
+ private String getString(String name) {
+ if (override != null && override.containsKey(name)) {
+ return override.getProperty(name);
+ }
+ return conf != null ? conf.get(name) : null;
+ }
+
+
+ private int getInt(String name, int defaultValue) {
+ if (override != null && override.containsKey(name)) {
+ return Integer.parseInt(override.getProperty(name));
+ }
+ return conf != null ? conf.getInt(name, defaultValue) : defaultValue;
+ }
+
+
+ private boolean getBoolean(String name, boolean defaultValue) {
+ if (override != null && override.containsKey(name)) {
+ return Boolean.parseBoolean(override.getProperty(name));
+ }
+ return conf != null ? conf.getBoolean(name, defaultValue) : defaultValue;
+
+ }
+
+ public static Properties loadProperties(String... params) {
+ Properties properties = new Properties();
+ if (params != null) {
+ for (String s : params) {
+ try {
+ properties.load(new StringReader(s));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+ return properties;
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
new file mode 100644
index 00000000000..dbe47c23814
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
@@ -0,0 +1,104 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class VespaCounters {
+
+ public static final String GROUP = "Vespa Feed Counters";
+ public static final String DOCS_OK = "Documents ok";
+ public static final String DOCS_SENT = "Documents sent";
+ public static final String DOCS_FAILED = "Documents failed";
+ public static final String DOCS_SKIPPED = "Documents skipped";
+
+ private final Counter documentsSent;
+ private final Counter documentsOk;
+ private final Counter documentsFailed;
+ private final Counter documentsSkipped;
+
+
+ private VespaCounters(Job job) throws IOException {
+ Counters counters = job.getCounters();
+ documentsSent = counters.findCounter(GROUP, DOCS_SENT);
+ documentsOk = counters.findCounter(GROUP, DOCS_OK);
+ documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ private VespaCounters(TaskAttemptContext context) {
+ documentsSent = context.getCounter(GROUP, DOCS_SENT);
+ documentsOk = context.getCounter(GROUP, DOCS_OK);
+ documentsFailed = context.getCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = context.getCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ private VespaCounters(org.apache.hadoop.mapred.Counters counters) {
+ documentsSent = counters.findCounter(GROUP, DOCS_SENT);
+ documentsOk = counters.findCounter(GROUP, DOCS_OK);
+ documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ public static VespaCounters get(Job job) throws IOException {
+ return new VespaCounters(job);
+ }
+
+
+ public static VespaCounters get(TaskAttemptContext context) {
+ return new VespaCounters(context);
+ }
+
+
+ public static VespaCounters get(org.apache.hadoop.mapred.Counters counters) {
+ return new VespaCounters(counters);
+
+ }
+
+
+ public long getDocumentsSent() {
+ return documentsSent.getValue();
+ }
+
+
+ public void incrementDocumentsSent(long incr) {
+ documentsSent.increment(incr);
+ }
+
+
+ public long getDocumentsOk() {
+ return documentsOk.getValue();
+ }
+
+
+ public void incrementDocumentsOk(long incr) {
+ documentsOk.increment(incr);
+ }
+
+
+ public long getDocumentsFailed() {
+ return documentsFailed.getValue();
+ }
+
+
+ public void incrementDocumentsFailed(long incr) {
+ documentsFailed.increment(incr);
+ }
+
+
+ public long getDocumentsSkipped() {
+ return documentsSkipped.getValue();
+ }
+
+
+ public void incrementDocumentsSkipped(long incr) {
+ documentsSkipped.increment(incr);
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
new file mode 100644
index 00000000000..d62ec99d1bd
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
@@ -0,0 +1,80 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Scanner;
+
+public class VespaHttpClient {
+
+ private final HttpClient httpClient;
+
+ public VespaHttpClient() {
+ this(null);
+ }
+
+ public VespaHttpClient(VespaConfiguration configuration) {
+ httpClient = createClient(configuration);
+ }
+
+ public String get(String url) throws IOException {
+ HttpGet httpGet = new HttpGet(url);
+ HttpResponse httpResponse = httpClient.execute(httpGet);
+ if (httpResponse.getStatusLine().getStatusCode() != 200) {
+ return null;
+ }
+
+ HttpEntity entity = httpResponse.getEntity();
+ InputStream is = entity.getContent();
+
+ String result = "";
+ Scanner scanner = new Scanner(is, "UTF-8").useDelimiter("\\A");
+ if (scanner.hasNext()) {
+ result = scanner.next();
+ }
+ EntityUtils.consume(entity);
+
+ return result;
+ }
+
+ public JsonNode parseResultJson(String json) throws IOException {
+ if (json == null || json.isEmpty()) {
+ return null;
+ }
+ ObjectMapper m = new ObjectMapper();
+ JsonNode node = m.readTree(json);
+ if (node != null) {
+ node = node.get("root");
+ if (node != null) {
+ node = node.get("children");
+ }
+ }
+ return node;
+ }
+
+ private HttpClient createClient(VespaConfiguration configuration) {
+ HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+
+ RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ if (configuration != null) {
+ requestConfigBuilder.setSocketTimeout(configuration.queryConnectionTimeout());
+ requestConfigBuilder.setConnectTimeout(configuration.queryConnectionTimeout());
+ if (configuration.proxyHost() != null) {
+ requestConfigBuilder.setProxy(new HttpHost(configuration.proxyHost(), configuration.proxyPort()));
+ }
+ }
+ clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
+ return clientBuilder.build();
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
new file mode 100644
index 00000000000..0208b4165d3
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
@@ -0,0 +1,113 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class VespaQuerySchema implements Iterable<VespaQuerySchema.AliasTypePair> {
+
+ private final List<AliasTypePair> tupleSchema = new ArrayList<>();
+
+ public VespaQuerySchema(String schema) {
+ for (String e : schema.split(",")) {
+ String[] pair = e.split(":");
+ String alias = pair[0].trim();
+ String type = pair[1].trim();
+ tupleSchema.add(new AliasTypePair(alias, type));
+ }
+ }
+
+ public Tuple buildTuple(int rank, JsonNode hit) {
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ for (VespaQuerySchema.AliasTypePair tupleElement : tupleSchema) {
+ String alias = tupleElement.getAlias();
+ Byte type = DataType.findTypeByName(tupleElement.getType());
+
+ // reserved word
+ if ("rank".equals(alias)) {
+ tuple.append(rank);
+ } else {
+ JsonNode field = hit;
+ String[] path = alias.split("/"); // move outside
+ for (String p : path) {
+ field = field.get(p);
+ if (field == null) {
+ type = DataType.NULL; // effectively skip field as it is not found
+ break;
+ }
+ }
+ switch (type) {
+ case DataType.BOOLEAN:
+ tuple.append(field.asBoolean());
+ break;
+ case DataType.INTEGER:
+ tuple.append(field.asInt());
+ break;
+ case DataType.LONG:
+ tuple.append(field.asLong());
+ break;
+ case DataType.FLOAT:
+ case DataType.DOUBLE:
+ tuple.append(field.asDouble());
+ break;
+ case DataType.DATETIME:
+ tuple.append(field.asText());
+ break;
+ case DataType.CHARARRAY:
+ tuple.append(field.asText());
+ break;
+ default:
+ // the rest of the data types are currently not supported
+ }
+ }
+ }
+ return tuple;
+ }
+
+ public static Schema getPigSchema(String schemaString) {
+ Schema schema = null;
+ schemaString = schemaString.replace("/", "_");
+ schemaString = "{(" + schemaString + ")}";
+ try {
+ schema = Utils.getSchemaFromString(schemaString);
+ } catch (ParserException e) {
+ e.printStackTrace();
+ }
+ return schema;
+ }
+
+ @Override
+ public Iterator<AliasTypePair> iterator() {
+ return tupleSchema.iterator();
+ }
+
+
+ public static class AliasTypePair {
+ private final String alias;
+ private final String type;
+
+ AliasTypePair(String alias, String type) {
+ this.alias = alias;
+ this.type = type;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
new file mode 100644
index 00000000000..5ef40bde308
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -0,0 +1,335 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigWarning;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * A Pig UDF to convert simple Pig types into a valid Vespa JSON document format.
+ *
+ * @author lesters
+ */
+public class VespaDocumentOperation extends EvalFunc<String> {
+
+ public enum Operation {
+ PUT,
+ ID,
+ REMOVE,
+ UPDATE;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase();
+ }
+
+ public static Operation fromString(String text) {
+ for (Operation op : Operation.values()) {
+ if (op.toString().equalsIgnoreCase(text)) {
+ return op;
+ }
+ }
+ throw new IllegalArgumentException("Unknown operation: " + text);
+ }
+
+ public static boolean valid(String text) {
+ for (Operation op : Operation.values()) {
+ if (op.toString().equalsIgnoreCase(text)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+ private static final String PROPERTY_ID_TEMPLATE = "docid";
+ private static final String PROPERTY_OPERATION = "operation";
+ private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields";
+ private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
+
+ private static final String PARTIAL_UPDATE_ASSIGN = "assign";
+
+ private final String template;
+ private final Operation operation;
+ private final Properties properties;
+
+ public VespaDocumentOperation(String... params) {
+ properties = VespaConfiguration.loadProperties(params);
+ template = properties.getProperty(PROPERTY_ID_TEMPLATE);
+ operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
+ }
+
+ @Override
+ public String exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return null;
+ }
+ if (template == null || template.length() == 0) {
+ warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+ if (operation == null) {
+ warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ String json = null;
+
+ try {
+ if (reporter != null) {
+ reporter.progress();
+ }
+
+ Map<String, Object> fields = TupleTools.tupleMap(getInputSchema(), tuple);
+ String docId = TupleTools.toString(fields, template);
+
+ // create json
+ json = create(operation, docId, fields, properties);
+ if (json == null || json.length() == 0) {
+ warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+
+ } catch (Exception e) {
+ throw new IOException("Caught exception processing input row ", e);
+ }
+
+ return json;
+ }
+
+
+ /**
+ * Create a JSON Vespa document operation given the supplied fields,
+ * operation and document id template.
+ *
+ * @param op Operation (put, remove, update)
+ * @param docId Document id
+ * @param fields Fields to put in document operation
+ * @return A valid JSON Vespa document operation
+ * @throws IOException
+ */
+ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException {
+ if (op == null) {
+ return null;
+ }
+ if (docId == null || docId.length() == 0) {
+ return null;
+ }
+ if (fields.isEmpty()) {
+ return null;
+ }
+
+ // create json format
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+ g.writeStartObject();
+
+ g.writeStringField(op.toString(), docId);
+
+ if (op != Operation.REMOVE) {
+ writeField("fields", fields, DataType.MAP, g, properties, op, 0);
+ }
+
+ g.writeEndObject();
+ g.close();
+
+ return out.toString();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Operation op, int depth) throws IOException {
+ g.writeFieldName(name);
+ if (shouldWritePartialUpdate(op, depth)) {
+ writePartialUpdate(value, type, g, name, properties, op, depth);
+ } else {
+ writeValue(value, type, g, name, properties, op, depth);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ switch (type) {
+ case DataType.UNKNOWN:
+ break;
+ case DataType.NULL:
+ g.writeNull();
+ break;
+ case DataType.BOOLEAN:
+ g.writeBoolean((boolean) value);
+ break;
+ case DataType.INTEGER:
+ g.writeNumber((int) value);
+ break;
+ case DataType.LONG:
+ g.writeNumber((long) value);
+ break;
+ case DataType.FLOAT:
+ g.writeNumber((float) value);
+ break;
+ case DataType.DOUBLE:
+ g.writeNumber((double) value);
+ break;
+ case DataType.DATETIME:
+ g.writeNumber(((DateTime) value).getMillis());
+ break;
+ case DataType.BYTEARRAY:
+ DataByteArray bytes = (DataByteArray) value;
+ String raw = Base64.getEncoder().encodeToString(bytes.get());
+ g.writeString(raw);
+ break;
+ case DataType.CHARARRAY:
+ g.writeString((String) value);
+ break;
+ case DataType.BIGINTEGER:
+ g.writeNumber((BigInteger) value);
+ break;
+ case DataType.BIGDECIMAL:
+ g.writeNumber((BigDecimal) value);
+ break;
+ case DataType.MAP:
+ g.writeStartObject();
+ Map<Object, Object> map = (Map<Object, Object>) value;
+ if (shouldCreateTensor(map, name, properties)) {
+ writeTensor(map, g);
+ } else {
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ Object v = entry.getValue();
+ Byte t = DataType.findType(v);
+ writeField(k, v, t, g, properties, op, depth+1);
+ }
+ }
+ g.writeEndObject();
+ break;
+ case DataType.TUPLE:
+ Tuple tuple = (Tuple) value;
+ boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
+ if (writeStartArray) {
+ g.writeStartArray();
+ }
+ for (Object v : tuple) {
+ writeValue(v, DataType.findType(v), g, name, properties, op, depth);
+ }
+ if (writeStartArray) {
+ g.writeEndArray();
+ }
+ break;
+ case DataType.BAG:
+ DataBag bag = (DataBag) value;
+ g.writeStartArray();
+ for (Tuple t : bag) {
+ writeValue(t, DataType.TUPLE, g, name, properties, op, depth);
+ }
+ g.writeEndArray();
+ break;
+ }
+
+ }
+
+ private static boolean shouldWritePartialUpdate(Operation op, int depth) {
+ return op == Operation.UPDATE && depth == 1;
+ }
+
+ private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ g.writeStartObject();
+ g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation
+ writeValue(value, type, g, name, properties, op, depth);
+ g.writeEndObject();
+ }
+
+ private static boolean shouldWriteTupleStart(Tuple tuple, String name, Properties properties) {
+ if (tuple.size() > 1 || properties == null) {
+ return true;
+ }
+ String simpleArrayFields = properties.getProperty(SIMPLE_ARRAY_FIELDS);
+ if (simpleArrayFields == null) {
+ return true;
+ }
+ if (simpleArrayFields.equals("*")) {
+ return false;
+ }
+ String[] fields = simpleArrayFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS);
+ if (tensorFields == null) {
+ return false;
+ }
+ String[] fields = tensorFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
+ g.writeFieldName("cells");
+ g.writeStartArray();
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ Double v = Double.parseDouble(entry.getValue().toString());
+
+ g.writeStartObject();
+
+ // Write address
+ g.writeFieldName("address");
+ g.writeStartObject();
+
+ String[] dimensions = k.split(",");
+ for (String dimension : dimensions) {
+ if (dimension == null || dimension.isEmpty()) {
+ continue;
+ }
+ String[] address = dimension.split(":");
+ if (address.length != 2) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ String dim = address[0];
+ String label = address[1];
+ if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ g.writeFieldName(dim.trim());
+ g.writeString(label.trim());
+ }
+ g.writeEndObject();
+
+ // Write value
+ g.writeFieldName("value");
+ g.writeNumber(v);
+
+ g.writeEndObject();
+ }
+ g.writeEndArray();
+ }
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
new file mode 100644
index 00000000000..5faa6da94bd
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
@@ -0,0 +1,111 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaQuerySchema;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A Pig UDF to run a query against a Vespa cluster and return the
+ * results.
+ *
+ * @author lesters
+ */
+public class VespaQuery extends EvalFunc<DataBag> {
+
+ private final String PROPERTY_QUERY_TEMPLATE = "query";
+ private final String PROPERTY_QUERY_SCHEMA = "schema";
+
+ private final VespaConfiguration configuration;
+ private final Properties properties;
+ private final String queryTemplate;
+ private final String querySchema;
+
+ private VespaHttpClient httpClient;
+
+ public VespaQuery() {
+ this(null);
+ }
+
+ public VespaQuery(String... params) {
+ configuration = VespaConfiguration.get(UDFContext.getUDFContext().getJobConf(), null);
+ properties = VespaConfiguration.loadProperties(params);
+
+ queryTemplate = properties.getProperty(PROPERTY_QUERY_TEMPLATE);
+ if (queryTemplate == null || queryTemplate.isEmpty()) {
+ throw new IllegalArgumentException("Query template cannot be empty");
+ }
+
+ querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray");
+
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0) {
+ return null;
+ }
+ JsonNode jsonResult = queryVespa(input);
+ if (jsonResult == null) {
+ return null;
+ }
+ return createPigRepresentation(jsonResult);
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return VespaQuerySchema.getPigSchema(querySchema);
+ }
+
+
+ private JsonNode queryVespa(Tuple input) throws IOException {
+ String url = createVespaQueryUrl(input);
+ if (url == null) {
+ return null;
+ }
+ String result = executeVespaQuery(url);
+ return parseVespaResultJson(result);
+ }
+
+
+ private String createVespaQueryUrl(Tuple input) throws IOException {
+ return TupleTools.toString(getInputSchema(), input, queryTemplate);
+ }
+
+
+ private String executeVespaQuery(String url) throws IOException {
+ if (httpClient == null) {
+ httpClient = new VespaHttpClient(configuration);
+ }
+ return httpClient.get(url);
+ }
+
+ private JsonNode parseVespaResultJson(String result) throws IOException {
+ return httpClient == null ? null : httpClient.parseResultJson(result);
+ }
+
+ private DataBag createPigRepresentation(JsonNode hits) {
+ DataBag bag = new SortedDataBag(null);
+ VespaQuerySchema querySchema = new VespaQuerySchema(this.querySchema);
+
+ for (int rank = 0; rank < hits.size(); ++rank) {
+ JsonNode hit = hits.get(rank);
+ Tuple tuple = querySchema.buildTuple(rank, hit);
+ bag.add(tuple);
+ }
+
+ return bag;
+ }
+
+
+
+
+}
diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
new file mode 100644
index 00000000000..4caa382223b
--- /dev/null
+++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
@@ -0,0 +1,186 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+import java.io.*;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A small Pig UDF wrapper around the Vespa http client for
+ * feeding data into a Vespa endpoint.
+ *
+ * @author lesters
+ */
+public class VespaStorage extends StoreFunc {
+
+ private final boolean createDocOp;
+ private final String template;
+ private final VespaDocumentOperation.Operation operation;
+
+ private String signature = null;
+ private RecordWriter recordWriter = null;
+ private ResourceSchema resourceSchema = null;
+ private Properties properties = new Properties();
+
+ private static final String PROPERTY_CREATE_DOC_OP = "create-document-operation";
+ private static final String PROPERTY_ID_TEMPLATE = "docid";
+ private static final String PROPERTY_OPERATION = "operation";
+ private static final String PROPERTY_RESOURCE_SCHEMA = "resource_schema";
+
+ public VespaStorage() {
+ createDocOp = false;
+ template = null;
+ operation = null;
+ }
+
+ public VespaStorage(String... params) {
+ properties = VespaConfiguration.loadProperties(params);
+ createDocOp = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_DOC_OP, "false"));
+ operation = VespaDocumentOperation.Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
+ template = properties.getProperty(PROPERTY_ID_TEMPLATE);
+ }
+
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new VespaOutputFormat(properties);
+ }
+
+
+ @Override
+ public void setStoreLocation(String endpoint, Job job) throws IOException {
+ properties.setProperty(VespaConfiguration.ENDPOINT, endpoint);
+ }
+
+
+ @Override
+ public void prepareToWrite(RecordWriter recordWriter) throws IOException {
+ this.recordWriter = recordWriter;
+ this.resourceSchema = getResourceSchema();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return;
+ }
+
+ String data = null;
+ if (createDocOp) {
+ data = createDocumentOperation(tuple);
+ } else if (!tuple.isNull(0)) {
+ data = tuple.get(0).toString(); // assume single field with correctly formatted doc op.
+ }
+
+ if (data == null || data.length() == 0) {
+ return;
+ }
+
+ try {
+ recordWriter.write(0, data);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+ setResourceSchema(resourceSchema);
+ }
+
+
+ @Override
+ public String relToAbsPathForStoreLocation(String endpoint, Path path) throws IOException {
+ return endpoint;
+ }
+
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String s) {
+ this.signature = s;
+ }
+
+
+ @Override
+ public void cleanupOnFailure(String s, Job job) throws IOException {
+ }
+
+
+ @Override
+ public void cleanupOnSuccess(String s, Job job) throws IOException {
+ }
+
+
+ private ResourceSchema getResourceSchema() throws IOException {
+ Properties properties = getUDFProperties();
+ return base64Deserialize(properties.getProperty(PROPERTY_RESOURCE_SCHEMA));
+ }
+
+
+ private void setResourceSchema(ResourceSchema schema) throws IOException {
+ Properties properties = getUDFProperties();
+ if (properties.getProperty(PROPERTY_RESOURCE_SCHEMA) == null) {
+ properties.setProperty(PROPERTY_RESOURCE_SCHEMA, base64Serialize(schema));
+ }
+ }
+
+
+ private Properties getUDFProperties() {
+ String[] context = { signature };
+ return UDFContext.getUDFContext().getUDFProperties(getClass(), context);
+ }
+
+
+ private String createDocumentOperation(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return null;
+ }
+ if (resourceSchema == null) {
+ return null;
+ }
+
+ Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple);
+ String docId = TupleTools.toString(fields, template);
+
+ return VespaDocumentOperation.create(operation, docId, fields, properties);
+ }
+
+
+ public static String base64Serialize(Object o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(o);
+ }
+ return Base64.getEncoder().encodeToString(baos.toByteArray());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Serializable> T base64Deserialize(String s) throws IOException {
+ Object ret;
+ byte[] data = Base64.getDecoder().decode(s);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ ret = ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return (T) ret;
+ }
+
+}
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
new file mode 100644
index 00000000000..7c44b54efcb
--- /dev/null
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
@@ -0,0 +1,196 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.test.PathUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MapReduceTest {
+
+ protected static File hdfsBaseDir;
+ protected static FileSystem hdfs;
+ protected static Configuration conf;
+ protected static MiniDFSCluster cluster;
+
+ protected static Path metricsJsonPath;
+ protected static Path metricsCsvPath;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath());
+
+ conf = new HdfsConfiguration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath());
+ conf.set(VespaConfiguration.DRYRUN, "true");
+ conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun");
+
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ hdfs = FileSystem.get(conf);
+
+ metricsJsonPath = new Path("metrics_json");
+ metricsCsvPath = new Path("metrics_csv");
+ copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data");
+ copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data");
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ Path testDir = new Path(hdfsBaseDir.getParent());
+ hdfs.delete(testDir, true);
+ cluster.shutdown();
+ LocalFileSystem localFileSystem = FileSystem.getLocal(conf);
+ localFileSystem.delete(testDir, true);
+ }
+
+ @Test
+ public void requireThatMapOnlyJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(FeedMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(job, metricsJsonPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+
+ @Test
+ public void requireThatMapReduceJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(FeedMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(FeedReducer.class);
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, metricsJsonPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+
+
+ @Test
+ public void requireThatTransformMapJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(ParsingMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(FeedReducer.class);
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, metricsCsvPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ assertEquals(0, counters.getDocumentsSkipped());
+ }
+
+
+ private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException {
+ Path hdfsPath = new Path(hdfsDir, hdfsName);
+ FSDataOutputStream out = hdfs.create(hdfsPath);
+
+ try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
+ int len;
+ byte[] buffer = new byte[1024];
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> {
+ public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String line = value.toString();
+ if (line == null || line.length() == 0)
+ return;
+
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ long date = Long.parseLong(tokenizer.nextToken());
+ String metricName = tokenizer.nextToken();
+ long metricValue = Long.parseLong(tokenizer.nextToken());
+ String application = tokenizer.nextToken();
+
+ String docid = "id:"+application+":metric::"+metricName+"-"+date;
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+
+ g.writeStartObject();
+ g.writeObjectFieldStart("fields");
+ g.writeNumberField("date", date);
+ g.writeStringField("name", metricName);
+ g.writeNumberField("value", metricValue);
+ g.writeStringField("application", application);
+ g.writeEndObject();
+ g.writeStringField("put", docid);
+ g.writeEndObject();
+ g.close();
+
+ context.write(key, new Text(out.toString()));
+ }
+ }
+
+
+}
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
new file mode 100644
index 00000000000..9ac98ab5022
--- /dev/null
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -0,0 +1,257 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class VespaDocumentOperationTest {
+
+ @Test
+ public void requireThatUDFReturnsCorrectJson() throws Exception {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ // operation put is default
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("put").getTextValue());
+ assertEquals("testapp", fields.get("application").getTextValue());
+ assertEquals("clicks", fields.get("name").getTextValue());
+ assertEquals(3, fields.get("value").getIntValue());
+ }
+
+
+ @Test
+ public void requireThatUDFSupportsUpdateAssign() throws IOException {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").getTextValue());
+ assertEquals("testapp", fields.get("application").get("assign").getTextValue());
+ assertEquals("clicks", fields.get("name").get("assign").getTextValue());
+ assertEquals(3, fields.get("value").get("assign").getIntValue());
+ }
+
+
+ @Test
+ public void requireThatUDFReturnsNullForMissingConfig() throws Exception {
+ String json = getDocumentOperationJson();
+ assertNull(json);
+ }
+
+
+ @Test
+ public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception {
+ String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("remove").getTextValue());
+ assertNull(fields);
+ }
+
+
+ @Test
+ public void requireThatUDFGeneratesComplexDataTypes() throws Exception {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ Tuple intTuple = TupleFactory.getInstance().newTuple();
+ int[] intArray = {1, 2, 3};
+ for (int i : intArray) { intTuple.append(i); }
+
+ Tuple stringTuple = TupleFactory.getInstance().newTuple();
+ String[] stringArray = {"a", "b", "c"};
+ for (String s : stringArray) { stringTuple.append(s); }
+
+ DataBag bag = new SortedDataBag(null);
+ bag.add(intTuple);
+ bag.add(stringTuple);
+
+ Map<String, Object> innerMap = new HashMap<String, Object>() {{
+ put("a", "string");
+ put("tuple", intTuple);
+ }};
+
+ DataByteArray bytes = new DataByteArray("testdata".getBytes());
+
+ Map<String, Object> outerMap = new HashMap<String, Object>() {{
+ put("string", "value");
+ put("int", 3);
+ put("float", 3.145);
+ put("bool", true);
+ put("byte", bytes);
+ put("map", innerMap);
+ put("bag", bag);
+ }};
+
+ addToTuple("map", DataType.MAP, outerMap, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode map = fields.get("map");
+
+ assertEquals("value", map.get("string").getTextValue());
+ assertEquals(3, map.get("int").getIntValue());
+ assertEquals(3.145, map.get("float").getDoubleValue(), 1e-6);
+ assertEquals(true, map.get("bool").getBooleanValue());
+ assertEquals("dGVzdGRhdGE=", map.get("byte").getTextValue());
+
+ assertEquals("string", map.get("map").get("a").getTextValue());
+ for (int i = 0; i < intArray.length; ++i) {
+ assertEquals(intArray[i], map.get("map").get("tuple").get(i).asInt());
+ }
+
+ JsonNode bagField = map.get("bag");
+ for (int i = 0; i < intArray.length; ++i) {
+ assertEquals(intArray[i], bagField.get(0).get(i).asInt());
+ }
+ for (int i = 0; i < stringArray.length; ++i) {
+ assertEquals(stringArray[i], bagField.get(1).get(i).asText());
+ }
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysMustBeConfigured() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty"); // simple arrays not configured
+ // json: [["a"], ["b"], ["c"]]
+ assertEquals("a", array.get(0).get(0).asText());
+ assertEquals("b", array.get(1).get(0).asText());
+ assertEquals("c", array.get(2).get(0).asText());
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysAreSupported() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=array");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysCanBeConfiguredWithWildcard() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=*");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ @Test
+ public void requireThatMultipleSimpleArraysAreSupported() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=empty,array");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ private JsonNode setupSimpleArrayOperation(String name, String[] array, String... params) throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ DataBag bag = new SortedDataBag(null);
+ for (String s : array) {
+ Tuple stringTuple = TupleFactory.getInstance().newTuple();
+ stringTuple.append(s);
+ bag.add(stringTuple);
+ }
+ addToTuple(name, DataType.BAG, bag, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation(params);
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ return fields.get(name);
+ }
+
+
+ @Test
+ public void requireThatUDFSupportsTensors() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ // Please refer to the tensor format documentation
+
+ Map<String, Double> tensor = new HashMap<String, Double>() {{
+ put("x:label1,y:label2,z:label4", 2.0);
+ put("x:label3", 3.0);
+ }};
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode tensorNode = fields.get("tensor");
+ JsonNode cells = tensorNode.get("cells");
+
+ assertEquals("label1", cells.get(0).get("address").get("x").asText());
+ assertEquals("label2", cells.get(0).get("address").get("y").asText());
+ assertEquals("label4", cells.get(0).get("address").get("z").asText());
+ assertEquals("label3", cells.get(1).get("address").get("x").asText());
+
+ assertEquals(2.0, cells.get(0).get("value").asDouble(), 1e-6);
+ assertEquals(3.0, cells.get(1).get("value").asDouble(), 1e-6);
+ }
+
+
+ private String getDocumentOperationJson(String... params) throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("application", DataType.CHARARRAY, "testapp", schema, tuple);
+ addToTuple("name", DataType.CHARARRAY, "clicks", schema, tuple);
+ addToTuple("date", DataType.CHARARRAY, "20160112", schema, tuple);
+ addToTuple("value", DataType.CHARARRAY, 3, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation(params);
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ return json;
+ }
+
+
+ private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
+ schema.add(new Schema.FieldSchema(alias, type));
+ tuple.append(value);
+ }
+
+
+}
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
new file mode 100644
index 00000000000..9ba212293d0
--- /dev/null
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
@@ -0,0 +1,106 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.sun.net.httpserver.HttpServer;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient;
+import com.yahoo.vespa.hadoop.util.MockQueryHandler;
+import junit.framework.Assert;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class VespaQueryTest {
+
+ @Test
+ public void requireThatQueriesAreReturnedCorrectly() throws Exception {
+ MockQueryHandler queryHandler = createQueryHandler();
+
+ final int port = 18901;
+ final String endpoint = "http://localhost:" + port;
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+ server.createContext("/", queryHandler);
+ server.start();
+
+ PigServer ps = setup("src/test/pig/query.pig", endpoint);
+
+ Iterator<Tuple> recommendations = ps.openIterator("recommendations");
+ while (recommendations.hasNext()) {
+ Tuple tuple = recommendations.next();
+
+ String userid = (String) tuple.get(0);
+ Integer rank = (Integer) tuple.get(1);
+ String docid = (String) tuple.get(2);
+ Double relevance = (Double) tuple.get(3);
+ String fieldId = (String) tuple.get(4);
+ String fieldContent = (String) tuple.get(5);
+
+ MockQueryHandler.MockQueryHit hit = queryHandler.getHit(userid, rank);
+ assertEquals(docid, hit.id);
+ assertEquals(relevance, hit.relevance, 1e-3);
+ assertEquals(fieldId, hit.fieldId);
+ assertEquals(fieldContent, hit.fieldContent);
+ }
+
+ if (server != null) {
+ server.stop(0);
+ }
+
+ }
+
+ private PigServer setup(String script, String endpoint) throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("ENDPOINT", endpoint);
+
+ PigServer ps = new PigServer(ExecType.LOCAL, conf);
+ ps.setBatchOn();
+ ps.registerScript(script, parameters);
+
+ return ps;
+ }
+
+ private MockQueryHandler createQueryHandler() {
+ MockQueryHandler queryHandler = new MockQueryHandler();
+
+ List<String> userIds = Arrays.asList("5", "104", "313");
+
+ int hitsPerUser = 3;
+ for (int i = 0; i < hitsPerUser * userIds.size(); ++i) {
+ String id = "" + (i+1);
+ String userId = userIds.get(i / hitsPerUser);
+ queryHandler.newHit().
+ setId("id::::" + id).
+ setRelevance(1.0 - (i % hitsPerUser) * 0.1).
+ setFieldSddocname("doctype").
+ setFieldId("" + id).
+ setFieldDate("2016060" + id).
+ setFieldContent("Content for user " + userId + " hit " + i % hitsPerUser + "...").
+ add(userId);
+ }
+
+ return queryHandler;
+ }
+
+}
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
new file mode 100644
index 00000000000..33756b3ad31
--- /dev/null
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -0,0 +1,80 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+public class VespaStorageTest {
+
+ @Test
+ public void requireThatPremadeOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatCreateOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_create_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig");
+ }
+
+
+ @Test
+ public void requireThatFeedVisitDataSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_visit_data.pig");
+ }
+
+ private PigServer setup(String script) throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(VespaConfiguration.DRYRUN, "true");
+ conf.set(VespaConfiguration.ENDPOINT, "dummy-endpoint");
+
+ // Parameter substitutions - can also be set by configuration
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("ENDPOINT", "endpoint-does-not-matter-in-dryrun,another-endpoint-that-does-not-matter");
+
+ PigServer ps = new PigServer(ExecType.LOCAL, conf);
+ ps.setBatchOn();
+ ps.registerScript(script, parameters);
+
+ return ps;
+ }
+
+ private void assertAllDocumentsOk(String script) throws Exception {
+ PigServer ps = setup(script);
+ List<ExecJob> jobs = ps.executeBatch();
+ PigStats stats = jobs.get(0).getStatistics();
+ for (JobStats js : stats.getJobGraph()) {
+ Counters hadoopCounters = ((MRJobStats)js).getHadoopCounters();
+ assertNotNull(hadoopCounters);
+ VespaCounters counters = VespaCounters.get(hadoopCounters);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+ }
+
+}
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
new file mode 100644
index 00000000000..f6977155b05
--- /dev/null
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
@@ -0,0 +1,206 @@
+package com.yahoo.vespa.hadoop.util;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockQueryHandler implements HttpHandler {
+
+ private final Map<String, List<MockQueryHit>> hitMap;
+
+ public MockQueryHandler() {
+ this.hitMap = new HashMap<>();
+ }
+
+ public void handle(HttpExchange t) throws IOException {
+ URI uri = t.getRequestURI();
+ String query = uri.getQuery();
+ String response = null;
+
+ // Parse query - extract "query" element
+ if (query != null) {
+ String params[] = query.split("[&]");
+ for (String param : params) {
+ int i = param.indexOf('=');
+ String name = param.substring(0, i);
+ String value = URLDecoder.decode(param.substring(i + 1), "UTF-8");
+
+ if ("query".equalsIgnoreCase(name)) {
+ response = getResponse(URLDecoder.decode(param.substring(i + 1), "UTF-8"));
+ }
+ }
+ }
+
+ t.sendResponseHeaders(200, response == null ? 0 : response.length());
+ OutputStream os = t.getResponseBody();
+ os.write(response == null ? "".getBytes() : response.getBytes());
+ os.close();
+
+ }
+
+ public MockQueryHit getHit(String query, Integer rank) {
+ if (!hitMap.containsKey(query)) {
+ return null;
+ }
+ if (rank >= hitMap.get(query).size()) {
+ return null;
+ }
+ return hitMap.get(query).get(rank);
+ }
+
+ public MockQueryHit newHit() {
+ return new MockQueryHit(this);
+ }
+
+ public void addHit(String query, MockQueryHit hit) {
+ if (!hitMap.containsKey(query)) {
+ hitMap.put(query, new ArrayList<>());
+ }
+ hitMap.get(query).add(hit);
+ }
+
+ private String getResponse(String query) throws IOException {
+ List<MockQueryHit> hits = hitMap.get(query);
+ if (hits == null) {
+ return null;
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+
+ writeResultStart(g, hits.size());
+ for (MockQueryHit hit : hits) {
+ writeHit(g, hit);
+ }
+ writeResultsEnd(g);
+ g.close();
+
+ return out.toString();
+ }
+
+ private void writeHit(JsonGenerator g, MockQueryHit hit) throws IOException {
+ g.writeStartObject();
+
+ g.writeFieldName("id");
+ g.writeString(hit.id);
+
+ g.writeFieldName("relevance");
+ g.writeNumber(hit.relevance);
+
+ g.writeFieldName("fields");
+ g.writeStartObject();
+
+ g.writeFieldName("sddocname");
+ g.writeString(hit.fieldSddocname);
+
+ g.writeFieldName("date");
+ g.writeString(hit.fieldDate);
+
+ g.writeFieldName("content");
+ g.writeString(hit.fieldContent);
+
+ g.writeFieldName("id");
+ g.writeString(hit.fieldId);
+
+ g.writeEndObject();
+ g.writeEndObject();
+ }
+
+ private void writeResultStart(JsonGenerator g, int count) throws IOException {
+ g.writeStartObject();
+ g.writeFieldName("root");
+
+ g.writeStartObject();
+
+ g.writeFieldName("id");
+ g.writeString("toplevel");
+
+ g.writeFieldName("relevance");
+ g.writeNumber(1);
+
+ g.writeFieldName("fields");
+ g.writeStartObject();
+ g.writeFieldName("totalCount");
+ g.writeNumber(count);
+ g.writeEndObject();
+
+ g.writeFieldName("coverage");
+ g.writeStartObject();
+ g.writeFieldName("coverage");
+ g.writeNumber(100);
+ // ... more stuff here usually
+ g.writeEndObject();
+
+ g.writeFieldName("children");
+ g.writeStartArray();
+ }
+
+ private void writeResultsEnd(JsonGenerator g) throws IOException {
+ g.writeEndArray();
+ g.writeEndObject();
+ g.writeEndObject();
+ }
+
+ public static class MockQueryHit {
+
+ private final MockQueryHandler handler;
+
+ public String id;
+ public Double relevance;
+ public String fieldSddocname;
+ public String fieldDate;
+ public String fieldContent;
+ public String fieldId;
+
+ private MockQueryHit(MockQueryHandler handler) {
+ this.handler = handler;
+ }
+
+ public void add(String query) {
+ handler.addHit(query, this);
+ }
+
+ public MockQueryHit setId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public MockQueryHit setRelevance(Double relevance) {
+ this.relevance = relevance;
+ return this;
+ }
+
+ public MockQueryHit setFieldSddocname(String fieldSddocname) {
+ this.fieldSddocname = fieldSddocname;
+ return this;
+ }
+
+ public MockQueryHit setFieldDate(String fieldDate) {
+ this.fieldDate = fieldDate;
+ return this;
+ }
+
+ public MockQueryHit setFieldContent(String fieldContent) {
+ this.fieldContent = fieldContent;
+ return this;
+ }
+
+ public MockQueryHit setFieldId(String fieldId) {
+ this.fieldId = fieldId;
+ return this;
+ }
+ }
+
+}
diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
new file mode 100644
index 00000000000..001879cbdf8
--- /dev/null
+++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
@@ -0,0 +1,36 @@
+package com.yahoo.vespa.hadoop.util;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TupleToolsTest {
+
+ @Test
+ public void requireThatTupleToStringHandlesSimpleTypes() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
+
+ String template = "Id is <id> and rank is <rank>";
+ String result = TupleTools.toString(schema, tuple, template);
+
+ assertEquals("Id is 123 and rank is 1", result);
+ }
+
+
+ private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
+ schema.add(new Schema.FieldSchema(alias, type));
+ tuple.append(value);
+ }
+
+}
diff --git a/hadoop/src/test/pig/feed_create_operations.pig b/hadoop/src/test/pig/feed_create_operations.pig
new file mode 100644
index 00000000000..2186935b59a
--- /dev/null
+++ b/hadoop/src/test/pig/feed_create_operations.pig
@@ -0,0 +1,23 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Create valid Vespa put operations
+DEFINE VespaPutOperation
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:<application>:metrics::<name>-<date>'
+ );
+
+-- By default, VespaStorage assumes it's feeding valid Vespa operations
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load tabular data
+metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray);
+
+-- Transform tabular data to a Vespa document operation JSON format
+metrics = FOREACH metrics GENERATE VespaPutOperation(*);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
+
diff --git a/hadoop/src/test/pig/feed_create_operations_short_form.pig b/hadoop/src/test/pig/feed_create_operations_short_form.pig
new file mode 100644
index 00000000000..348616f00ad
--- /dev/null
+++ b/hadoop/src/test/pig/feed_create_operations_short_form.pig
@@ -0,0 +1,18 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Transform tabular data to a Vespa document operation JSON format
+-- as part of storing the data.
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage(
+ 'create-document-operation=true',
+ 'operation=put',
+ 'docid=id:<application>:metrics::<name>-<date>'
+ );
+
+-- Load tabular data
+metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
+
diff --git a/hadoop/src/test/pig/feed_operations.pig b/hadoop/src/test/pig/feed_operations.pig
new file mode 100644
index 00000000000..327181d4410
--- /dev/null
+++ b/hadoop/src/test/pig/feed_operations.pig
@@ -0,0 +1,10 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/operations_data.json' AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage(); \ No newline at end of file
diff --git a/hadoop/src/test/pig/feed_visit_data.pig b/hadoop/src/test/pig/feed_visit_data.pig
new file mode 100644
index 00000000000..14010c38336
--- /dev/null
+++ b/hadoop/src/test/pig/feed_visit_data.pig
@@ -0,0 +1,11 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define short name for VespaStorage
+DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data - one column for json data
+metrics = LOAD 'src/test/resources/visit_data.json' AS (data:chararray);
+
+-- Store into Vespa
+STORE metrics INTO '$ENDPOINT' USING VespaStorage();
+
diff --git a/hadoop/src/test/pig/query.pig b/hadoop/src/test/pig/query.pig
new file mode 100644
index 00000000000..70f53a992e2
--- /dev/null
+++ b/hadoop/src/test/pig/query.pig
@@ -0,0 +1,18 @@
+-- REGISTER vespa-hadoop.jar -- Not needed in tests
+
+-- Define Vespa query for retrieving blog posts
+DEFINE BlogPostRecommendations
+ com.yahoo.vespa.hadoop.pig.VespaQuery(
+ 'query=$ENDPOINT/search?query=<userid>&hits=100',
+ 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray'
+ );
+
+-- Load data from a local file
+users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray);
+users = FILTER users BY userid IS NOT null;
+
+-- Run a set of queries against Vespa
+recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*));
+
+-- Output recommendations
+DUMP recommendations;
diff --git a/hadoop/src/test/resources/operations_data.json b/hadoop/src/test/resources/operations_data.json
new file mode 100644
index 00000000000..5af436dbfe7
--- /dev/null
+++ b/hadoop/src/test/resources/operations_data.json
@@ -0,0 +1,10 @@
+{"put":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}}
+{"fields":{"date":"2015110416","name":"clicks","value":5,"application":"testapp"},"put":"id:testapp:metric::clicks-2015110416"}
+{"put":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}}
+{"put":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}}
+{"fields":{"date":"2015110422","name":"clicks","value":5,"application":"testapp"},"condition":"metrics==0","put":"id:testapp:metric::clicks-2015110422"}
+{"put":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}}
diff --git a/hadoop/src/test/resources/tabular_data.csv b/hadoop/src/test/resources/tabular_data.csv
new file mode 100644
index 00000000000..541597998e9
--- /dev/null
+++ b/hadoop/src/test/resources/tabular_data.csv
@@ -0,0 +1,11 @@
+2015110414 clicks 1 testapp
+2015110415 clicks 2 testapp
+2015110416 clicks 5 testapp
+2015110417 clicks 3 testapp
+2015110418 clicks 6 testapp
+2015110419 clicks 3 testapp
+2015110420 clicks 4 testapp
+2015110421 clicks 2 testapp
+2015110422 clicks 5 testapp
+2015110423 clicks 1 testapp
+
diff --git a/hadoop/src/test/resources/user_ids.csv b/hadoop/src/test/resources/user_ids.csv
new file mode 100644
index 00000000000..5875a3b9a7c
--- /dev/null
+++ b/hadoop/src/test/resources/user_ids.csv
@@ -0,0 +1,4 @@
+5
+104
+313
+
diff --git a/hadoop/src/test/resources/visit_data.json b/hadoop/src/test/resources/visit_data.json
new file mode 100644
index 00000000000..a48fc9cf1c0
--- /dev/null
+++ b/hadoop/src/test/resources/visit_data.json
@@ -0,0 +1,12 @@
+[
+{"id":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110416","fields":{"date":"2015110416","name":"clicks","value":4,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110422","fields":{"date":"2015110422","name":"clicks","value":7,"application":"testapp"}},
+{"id":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}}
+]