summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop/src/main
diff options
context:
space:
mode:
authorLester Solbakken <lesters@yahoo-inc.com>2017-03-17 12:29:41 +0100
committerLester Solbakken <lesters@yahoo-inc.com>2017-03-17 12:29:41 +0100
commitef775d57f273a69f0de2cd52518cbd9260e55eac (patch)
tree5b89780e05f24c70b0158968fda5225822df9933 /vespa-hadoop/src/main
parentb09d7deb64e5a723c5a052dd2b1db225f632405f (diff)
Renamce hadoop -> vespa-hadoop
Diffstat (limited to 'vespa-hadoop/src/main')
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java36
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java51
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java222
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java98
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java69
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java187
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java104
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java101
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java113
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java363
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java113
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java62
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java187
13 files changed, 1706 insertions, 0 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
new file mode 100644
index 00000000000..cd0f65496d0
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java
@@ -0,0 +1,36 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * The output committer describes the commit task output for a Map-Reduce
+ * job. Not currently used, but is part of the Hadoop protocol since 2.7.
+ *
+ * @author lesters
+ */
+public class VespaOutputCommitter extends OutputCommitter {
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
+ }
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
new file mode 100644
index 00000000000..720a6adf477
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java
@@ -0,0 +1,51 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * An output specification for writing to Vespa instances in a Map-Reduce job.
+ * Mainly returns an instance of a {@link VespaRecordWriter} that does the
+ * actual feeding to Vespa.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaOutputFormat extends OutputFormat {
+
+ private final Properties configOverride;
+
+ public VespaOutputFormat() {
+ super();
+ this.configOverride = null;
+ }
+
+ public VespaOutputFormat(Properties configOverride) {
+ super();
+ this.configOverride = configOverride;
+ }
+
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ VespaCounters counters = VespaCounters.get(context);
+ VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride);
+ return new VespaRecordWriter(configuration, counters);
+ }
+
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ return new VespaOutputCommitter();
+ }
+
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
new file mode 100644
index 00000000000..8072e99e2ab
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -0,0 +1,222 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation;
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedClientFactory;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import com.yahoo.vespa.http.client.config.FeedParams.DataFormat;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+
+/**
+ * VespaRecordWriter sends the output &lt;key, value&gt; to one or more Vespa
+ * endpoints.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaRecordWriter extends RecordWriter {
+
+ private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
+
+ private boolean initialized = false;
+ private FeedClient feedClient;
+
+ private final VespaCounters counters;
+ private final VespaConfiguration configuration;
+ private final int progressInterval;
+
+ VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) {
+ this.counters = counters;
+ this.configuration = configuration;
+ this.progressInterval = configuration.progressInterval();
+ }
+
+
+ @Override
+ public void write(Object key, Object data) throws IOException, InterruptedException {
+ if (!initialized) {
+ initialize();
+ }
+
+ String doc = data.toString().trim();
+
+ // Parse data to find document id - if none found, skip this write
+ String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc)
+ : findDocIdFromXml(doc);
+ if (docId != null && docId.length() >= 0) {
+ feedClient.stream(docId, doc);
+ counters.incrementDocumentsSent(1);
+ } else {
+ counters.incrementDocumentsSkipped(1);
+ }
+
+ if (counters.getDocumentsSent() % progressInterval == 0) {
+ String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
+ counters.getDocumentsSent(),
+ counters.getDocumentsOk(),
+ counters.getDocumentsFailed(),
+ counters.getDocumentsSkipped());
+ log.info(progress);
+ }
+
+ }
+
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (feedClient != null) {
+ feedClient.close();
+ }
+ }
+
+
+ private void initialize() {
+ if (!configuration.dryrun() && configuration.randomSartupSleepMs() > 0) {
+ int delay = new Random().nextInt(configuration.randomSartupSleepMs());
+ log.info("VespaStorage: Delaying startup by " + delay + " ms");
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e) {}
+ }
+
+ ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder();
+ connParamsBuilder.setDryRun(configuration.dryrun());
+ connParamsBuilder.setUseCompression(configuration.useCompression());
+ connParamsBuilder.setEnableV3Protocol(configuration.useV3Protocol());
+ connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections());
+ connParamsBuilder.setMaxRetries(configuration.numRetries());
+ if (configuration.proxyHost() != null) {
+ connParamsBuilder.setProxyHost(configuration.proxyHost());
+ }
+ if (configuration.proxyPort() >= 0) {
+ connParamsBuilder.setProxyPort(configuration.proxyPort());
+ }
+
+ FeedParams.Builder feedParamsBuilder = new FeedParams.Builder();
+ feedParamsBuilder.setDataFormat(configuration.dataFormat());
+ feedParamsBuilder.setRoute(configuration.route());
+ feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs());
+ feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests());
+
+ SessionParams.Builder sessionParams = new SessionParams.Builder();
+ sessionParams.setThrottlerMinSize(configuration.throttlerMinSize());
+ sessionParams.setConnectionParams(connParamsBuilder.build());
+ sessionParams.setFeedParams(feedParamsBuilder.build());
+
+ String endpoints = configuration.endpoint();
+ StringTokenizer tokenizer = new StringTokenizer(endpoints, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String endpoint = tokenizer.nextToken().trim();
+ sessionParams.addCluster(new Cluster.Builder().addEndpoint(
+ Endpoint.create(endpoint, 4080, false)
+ ).build());
+ }
+
+ ResultCallback resultCallback = new ResultCallback(counters);
+ feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback);
+
+ initialized = true;
+ log.info("VespaStorage configuration:\n" + configuration.toString());
+ }
+
+ private String findDocIdFromXml(String xml) {
+ try {
+ XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml));
+ while (eventReader.hasNext()) {
+ XMLEvent event = eventReader.nextEvent();
+ if (event.getEventType() == XMLEvent.START_ELEMENT) {
+ StartElement element = event.asStartElement();
+ String elementName = element.getName().getLocalPart();
+ if (VespaDocumentOperation.Operation.valid(elementName)) {
+ return element.getAttributeByName(QName.valueOf("documentid")).getValue();
+ }
+ }
+ }
+ } catch (XMLStreamException | FactoryConfigurationError e) {
+ // as json dude does
+ return null;
+ }
+ return null;
+ }
+
+ private String findDocId(String json) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try(JsonParser parser = factory.createParser(json)) {
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ return null;
+ }
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ String fieldName = parser.getCurrentName();
+ parser.nextToken();
+ if (VespaDocumentOperation.Operation.valid(fieldName)) {
+ String docId = parser.getText();
+ return docId;
+ } else {
+ parser.skipChildren();
+ }
+ }
+ } catch (JsonParseException ex) {
+ return null;
+ }
+ return null;
+ }
+
+
+ static class ResultCallback implements FeedClient.ResultCallback {
+ final VespaCounters counters;
+
+ public ResultCallback(VespaCounters counters) {
+ this.counters = counters;
+ }
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ if (!documentResult.isSuccess()) {
+ counters.incrementDocumentsFailed(1);
+ StringBuilder sb = new StringBuilder();
+ sb.append("Problems with docid ");
+ sb.append(docId);
+ sb.append(": ");
+ List<Result.Detail> details = documentResult.getDetails();
+ for (Result.Detail detail : details) {
+ sb.append(detail.toString());
+ sb.append(" ");
+ }
+ log.warning(sb.toString());
+ return;
+ }
+ counters.incrementDocumentsOk(1);
+ }
+
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
new file mode 100644
index 00000000000..d7bdc592fd5
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java
@@ -0,0 +1,98 @@
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+
+/**
+ * Simple JSON reader which splits the input file along JSON object boundaries.
+ *
+ * There are two cases handled here:
+ * 1. Each line contains a JSON object, i.e. { ... }
+ * 2. The file contains an array of objects with arbitrary line breaks, i.e. [ {...}, {...} ]
+ *
+ * Not suitable for cases where you want to extract objects from some other arbitrary structure.
+ *
+ * TODO: Support config which points to a array in the JSON as start point for object extraction,
+ * ala how it is done in VespaHttpClient.parseResultJson, i.e. support rootNode config.
+ *
+ * @author lesters
+ */
+public class VespaSimpleJsonInputFormat extends FileInputFormat<Text, NullWritable> {
+
+ @Override
+ public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ return new VespaJsonRecordReader();
+ }
+
+ public static class VespaJsonRecordReader extends RecordReader<Text, NullWritable> {
+ private long remaining;
+ private JsonParser parser;
+ private Text currentKey;
+ private NullWritable currentValue = NullWritable.get();
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit) split;
+ FSDataInputStream stream = FileSystem.get(context.getConfiguration()).open(fileSplit.getPath());
+ if (fileSplit.getStart() != 0) {
+ stream.seek(fileSplit.getStart());
+ }
+
+ remaining = fileSplit.getLength();
+
+ JsonFactory factory = new JsonFactory();
+ parser = factory.createParser(new BufferedInputStream(stream));
+ parser.setCodec(new ObjectMapper());
+ parser.nextToken();
+ if (parser.currentToken() == JsonToken.START_ARRAY) {
+ parser.nextToken();
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (parser.currentToken() != JsonToken.START_OBJECT) {
+ return true;
+ }
+ currentKey = new Text(parser.readValueAsTree().toString());
+ parser.nextToken();
+ return false;
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+ return currentKey;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return parser.getCurrentLocation().getByteOffset() / remaining;
+ }
+
+ @Override
+ public void close() throws IOException {
+ parser.close();
+ }
+ }
+
+}
+
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
new file mode 100644
index 00000000000..1d11a3f9fdd
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java
@@ -0,0 +1,69 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class TupleTools {
+
+ private static final Pattern pattern = Pattern.compile("<([\\w]+)>");
+
+ public static Map<String, Object> tupleMap(Schema schema, Tuple tuple) throws IOException {
+ Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
+ List<Schema.FieldSchema> schemas = schema.getFields();
+ for (int i = 0; i < schemas.size(); i++) {
+ Schema.FieldSchema field = schemas.get(i);
+ String alias = field.alias;
+ Object value = tuple.get(i);
+ if (value != null) {
+ tupleMap.put(alias, value);
+ }
+ }
+ return tupleMap;
+ }
+
+ public static Map<String, Object> tupleMap(ResourceSchema schema, Tuple tuple) throws IOException {
+ Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1);
+ ResourceSchema.ResourceFieldSchema[] schemas = schema.getFields();
+ for (int i = 0; i < schemas.length; i++) {
+ ResourceSchema.ResourceFieldSchema field = schemas[i];
+ String alias = field.getName();
+ Object value = tuple.get(i);
+ if (value != null) {
+ tupleMap.put(alias, value);
+ }
+ }
+ return tupleMap;
+ }
+
+ public static String toString(Schema schema, Tuple tuple, String template) throws IOException {
+ return toString(tupleMap(schema, tuple), template);
+ }
+
+ public static String toString(Map<String,Object> fields, String template) {
+ if (template == null || template.length() == 0) {
+ return template;
+ }
+ if (fields == null || fields.size() == 0) {
+ return template;
+ }
+
+ Matcher m = pattern.matcher(template);
+ StringBuffer sb = new StringBuffer();
+ while (m.find()) {
+ Object value = fields.get(m.group(1));
+ String replacement = value != null ? value.toString() : m.group(0);
+ m.appendReplacement(sb, Matcher.quoteReplacement(replacement));
+ }
+ m.appendTail(sb);
+ return sb.toString();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
new file mode 100644
index 00000000000..6e35e5e465a
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java
@@ -0,0 +1,187 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.yahoo.vespa.http.client.config.FeedParams;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+
+public class VespaConfiguration {
+
+ public static final String ENDPOINT = "vespa.feed.endpoint";
+ public static final String PROXY_HOST = "vespa.feed.proxy.host";
+ public static final String PROXY_PORT = "vespa.feed.proxy.port";
+ public static final String DRYRUN = "vespa.feed.dryrun";
+ public static final String USE_COMPRESSION = "vespa.feed.usecompression";
+ public static final String DATA_FORMAT = "vespa.feed.data.format";
+ public static final String PROGRESS_REPORT = "vespa.feed.progress.interval";
+ public static final String V3_PROTOCOL = "vespa.feed.v3.protocol";
+ public static final String CONNECTIONS = "vespa.feed.connections";
+ public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size";
+ public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout";
+ public static final String ROUTE = "vespa.feed.route";
+ public static final String MAX_SLEEP_TIME_MS = "vespa.feed.max.sleep.time.ms";
+ public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests";
+ public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms";
+ public static final String NUM_RETRIES = "vespa.feed.num.retries";
+
+ private final Configuration conf;
+ private final Properties override;
+
+ private VespaConfiguration(Configuration conf, Properties override) {
+ this.conf = conf;
+ this.override = override;
+ }
+
+
+ public static VespaConfiguration get(Configuration conf, Properties override) {
+ return new VespaConfiguration(conf, override);
+ }
+
+
+ public String endpoint() {
+ return getString(ENDPOINT);
+ }
+
+
+ public String proxyHost() {
+ return getString(PROXY_HOST);
+ }
+
+
+ public int proxyPort() {
+ return getInt(PROXY_PORT, 4080);
+ }
+
+
+ public boolean dryrun() {
+ return getBoolean(DRYRUN, false);
+ }
+
+
+ public boolean useCompression() {
+ return getBoolean(USE_COMPRESSION, true);
+ }
+
+
+ public boolean useV3Protocol() {
+ return getBoolean(V3_PROTOCOL, true);
+ }
+
+
+ public int numConnections() {
+ return getInt(CONNECTIONS, 2);
+ }
+
+
+ public int throttlerMinSize() {
+ return getInt(THROTTLER_MIN_SIZE, 0);
+ }
+
+
+ public int queryConnectionTimeout() {
+ return getInt(QUERY_CONNECTION_TIMEOUT, 10000);
+ }
+
+
+ public String route() {
+ return getString(ROUTE);
+ }
+
+
+ public int maxSleepTimeMs() {
+ return getInt(MAX_SLEEP_TIME_MS, 10000);
+ }
+
+
+ public int maxInFlightRequests() {
+ return getInt(MAX_IN_FLIGHT_REQUESTS, 1000);
+ }
+
+
+ public int randomSartupSleepMs() {
+ return getInt(RANDOM_STARTUP_SLEEP, 30000);
+ }
+
+
+ public int numRetries() {
+ return getInt(NUM_RETRIES, 100);
+ }
+
+
+ public FeedParams.DataFormat dataFormat() {
+ String format = getString(DATA_FORMAT);
+ if ("xml".equalsIgnoreCase(format)) {
+ return FeedParams.DataFormat.XML_UTF8;
+ }
+ return FeedParams.DataFormat.JSON_UTF8;
+ }
+
+
+ public int progressInterval() {
+ return getInt(PROGRESS_REPORT, 1000);
+ }
+
+
+ private String getString(String name) {
+ if (override != null && override.containsKey(name)) {
+ return override.getProperty(name);
+ }
+ return conf != null ? conf.get(name) : null;
+ }
+
+
+ private int getInt(String name, int defaultValue) {
+ if (override != null && override.containsKey(name)) {
+ return Integer.parseInt(override.getProperty(name));
+ }
+ return conf != null ? conf.getInt(name, defaultValue) : defaultValue;
+ }
+
+
+ private boolean getBoolean(String name, boolean defaultValue) {
+ if (override != null && override.containsKey(name)) {
+ return Boolean.parseBoolean(override.getProperty(name));
+ }
+ return conf != null ? conf.getBoolean(name, defaultValue) : defaultValue;
+
+ }
+
+ public static Properties loadProperties(String... params) {
+ Properties properties = new Properties();
+ if (params != null) {
+ for (String s : params) {
+ try {
+ properties.load(new StringReader(s));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+ return properties;
+ }
+
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ENDPOINT + ": " + endpoint() + "\n");
+ sb.append(PROXY_HOST + ": " + proxyHost() + "\n");
+ sb.append(PROXY_PORT + ": " + proxyPort() + "\n");
+ sb.append(DRYRUN + ": " + dryrun() +"\n");
+ sb.append(USE_COMPRESSION + ": " + useCompression() +"\n");
+ sb.append(DATA_FORMAT + ": " + dataFormat() +"\n");
+ sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n");
+ sb.append(V3_PROTOCOL + ": " + useV3Protocol() +"\n");
+ sb.append(CONNECTIONS + ": " + numConnections() +"\n");
+ sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n");
+ sb.append(QUERY_CONNECTION_TIMEOUT + ": " + queryConnectionTimeout() +"\n");
+ sb.append(ROUTE + ": " + route() +"\n");
+ sb.append(MAX_SLEEP_TIME_MS + ": " + maxSleepTimeMs() +"\n");
+ sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n");
+ sb.append(RANDOM_STARTUP_SLEEP + ": " + randomSartupSleepMs() +"\n");
+ sb.append(NUM_RETRIES + ": " + numRetries() +"\n");
+ return sb.toString();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
new file mode 100644
index 00000000000..dbe47c23814
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java
@@ -0,0 +1,104 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class VespaCounters {
+
+ public static final String GROUP = "Vespa Feed Counters";
+ public static final String DOCS_OK = "Documents ok";
+ public static final String DOCS_SENT = "Documents sent";
+ public static final String DOCS_FAILED = "Documents failed";
+ public static final String DOCS_SKIPPED = "Documents skipped";
+
+ private final Counter documentsSent;
+ private final Counter documentsOk;
+ private final Counter documentsFailed;
+ private final Counter documentsSkipped;
+
+
+ private VespaCounters(Job job) throws IOException {
+ Counters counters = job.getCounters();
+ documentsSent = counters.findCounter(GROUP, DOCS_SENT);
+ documentsOk = counters.findCounter(GROUP, DOCS_OK);
+ documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ private VespaCounters(TaskAttemptContext context) {
+ documentsSent = context.getCounter(GROUP, DOCS_SENT);
+ documentsOk = context.getCounter(GROUP, DOCS_OK);
+ documentsFailed = context.getCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = context.getCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ private VespaCounters(org.apache.hadoop.mapred.Counters counters) {
+ documentsSent = counters.findCounter(GROUP, DOCS_SENT);
+ documentsOk = counters.findCounter(GROUP, DOCS_OK);
+ documentsFailed = counters.findCounter(GROUP, DOCS_FAILED);
+ documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED);
+ }
+
+
+ public static VespaCounters get(Job job) throws IOException {
+ return new VespaCounters(job);
+ }
+
+
+ public static VespaCounters get(TaskAttemptContext context) {
+ return new VespaCounters(context);
+ }
+
+
+ public static VespaCounters get(org.apache.hadoop.mapred.Counters counters) {
+ return new VespaCounters(counters);
+
+ }
+
+
+ public long getDocumentsSent() {
+ return documentsSent.getValue();
+ }
+
+
+ public void incrementDocumentsSent(long incr) {
+ documentsSent.increment(incr);
+ }
+
+
+ public long getDocumentsOk() {
+ return documentsOk.getValue();
+ }
+
+
+ public void incrementDocumentsOk(long incr) {
+ documentsOk.increment(incr);
+ }
+
+
+ public long getDocumentsFailed() {
+ return documentsFailed.getValue();
+ }
+
+
+ public void incrementDocumentsFailed(long incr) {
+ documentsFailed.increment(incr);
+ }
+
+
+ public long getDocumentsSkipped() {
+ return documentsSkipped.getValue();
+ }
+
+
+ public void incrementDocumentsSkipped(long incr) {
+ documentsSkipped.increment(incr);
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
new file mode 100644
index 00000000000..50a089c8b45
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java
@@ -0,0 +1,101 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Scanner;
+
+public class VespaHttpClient {
+
+ private final HttpClient httpClient;
+
+ public VespaHttpClient() {
+ this(null);
+ }
+
+ public VespaHttpClient(VespaConfiguration configuration) {
+ httpClient = createClient(configuration);
+ }
+
+ public String get(String url) throws IOException {
+ HttpGet httpGet = new HttpGet(url);
+ HttpResponse httpResponse = httpClient.execute(httpGet);
+
+ HttpEntity entity = httpResponse.getEntity();
+ InputStream is = entity.getContent();
+
+ String result = "";
+ Scanner scanner = new Scanner(is, "UTF-8").useDelimiter("\\A");
+ if (scanner.hasNext()) {
+ result = scanner.next();
+ }
+ EntityUtils.consume(entity);
+
+ if (httpResponse.getStatusLine().getStatusCode() != 200) {
+ return null;
+ }
+
+ return result;
+ }
+
+ public JsonNode parseResultJson(String json, String rootNode) throws IOException {
+ if (json == null || json.isEmpty()) {
+ return null;
+ }
+ if (rootNode == null || rootNode.isEmpty()) {
+ return null;
+ }
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode node = m.readTree(json);
+ if (node != null) {
+ String[] path = rootNode.split("/");
+ for (String p : path) {
+ node = node.get(p);
+
+ if (node == null) {
+ return null;
+ }
+
+ // if node is an array, return the first node that has the correct path
+ if (node.isArray()) {
+ for (int i = 0; i < node.size(); ++i) {
+ JsonNode n = node.get(i);
+ if (n.has(p)) {
+ node = n;
+ break;
+ }
+ }
+ }
+
+ }
+ }
+ return node;
+ }
+
+ private HttpClient createClient(VespaConfiguration configuration) {
+ HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+
+ RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ if (configuration != null) {
+ requestConfigBuilder.setSocketTimeout(configuration.queryConnectionTimeout());
+ requestConfigBuilder.setConnectTimeout(configuration.queryConnectionTimeout());
+ if (configuration.proxyHost() != null) {
+ requestConfigBuilder.setProxy(new HttpHost(configuration.proxyHost(), configuration.proxyPort()));
+ }
+ }
+ clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
+ return clientBuilder.build();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
new file mode 100644
index 00000000000..0208b4165d3
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java
@@ -0,0 +1,113 @@
+package com.yahoo.vespa.hadoop.mapreduce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class VespaQuerySchema implements Iterable<VespaQuerySchema.AliasTypePair> {
+
+ private final List<AliasTypePair> tupleSchema = new ArrayList<>();
+
+ public VespaQuerySchema(String schema) {
+ for (String e : schema.split(",")) {
+ String[] pair = e.split(":");
+ String alias = pair[0].trim();
+ String type = pair[1].trim();
+ tupleSchema.add(new AliasTypePair(alias, type));
+ }
+ }
+
+ public Tuple buildTuple(int rank, JsonNode hit) {
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ for (VespaQuerySchema.AliasTypePair tupleElement : tupleSchema) {
+ String alias = tupleElement.getAlias();
+ Byte type = DataType.findTypeByName(tupleElement.getType());
+
+ // reserved word
+ if ("rank".equals(alias)) {
+ tuple.append(rank);
+ } else {
+ JsonNode field = hit;
+ String[] path = alias.split("/"); // move outside
+ for (String p : path) {
+ field = field.get(p);
+ if (field == null) {
+ type = DataType.NULL; // effectively skip field as it is not found
+ break;
+ }
+ }
+ switch (type) {
+ case DataType.BOOLEAN:
+ tuple.append(field.asBoolean());
+ break;
+ case DataType.INTEGER:
+ tuple.append(field.asInt());
+ break;
+ case DataType.LONG:
+ tuple.append(field.asLong());
+ break;
+ case DataType.FLOAT:
+ case DataType.DOUBLE:
+ tuple.append(field.asDouble());
+ break;
+ case DataType.DATETIME:
+ tuple.append(field.asText());
+ break;
+ case DataType.CHARARRAY:
+ tuple.append(field.asText());
+ break;
+ default:
+ // the rest of the data types are currently not supported
+ }
+ }
+ }
+ return tuple;
+ }
+
+ public static Schema getPigSchema(String schemaString) {
+ Schema schema = null;
+ schemaString = schemaString.replace("/", "_");
+ schemaString = "{(" + schemaString + ")}";
+ try {
+ schema = Utils.getSchemaFromString(schemaString);
+ } catch (ParserException e) {
+ e.printStackTrace();
+ }
+ return schema;
+ }
+
+ @Override
+ public Iterator<AliasTypePair> iterator() {
+ return tupleSchema.iterator();
+ }
+
+
+ public static class AliasTypePair {
+ private final String alias;
+ private final String type;
+
+ AliasTypePair(String alias, String type) {
+ this.alias = alias;
+ this.type = type;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
new file mode 100644
index 00000000000..017ffcdd215
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -0,0 +1,363 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigWarning;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * A Pig UDF to convert simple Pig types into a valid Vespa JSON document format.
+ *
+ * @author lesters
+ */
+public class VespaDocumentOperation extends EvalFunc<String> {
+
+ public enum Operation {
+ DOCUMENT,
+ PUT,
+ ID,
+ REMOVE,
+ UPDATE;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase();
+ }
+
+ public static Operation fromString(String text) {
+ for (Operation op : Operation.values()) {
+ if (op.toString().equalsIgnoreCase(text)) {
+ return op;
+ }
+ }
+ throw new IllegalArgumentException("Unknown operation: " + text);
+ }
+
+ public static boolean valid(String text) {
+ for (Operation op : Operation.values()) {
+ if (op.toString().equalsIgnoreCase(text)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+ private static final String PROPERTY_ID_TEMPLATE = "docid";
+ private static final String PROPERTY_OPERATION = "operation";
+ private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields";
+ private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
+ private static final String EXCLUDE_FIELDS = "exclude-fields";
+
+ private static final String PARTIAL_UPDATE_ASSIGN = "assign";
+
+ private final String template;
+ private final Operation operation;
+ private final Properties properties;
+
+ public VespaDocumentOperation(String... params) {
+ properties = VespaConfiguration.loadProperties(params);
+ template = properties.getProperty(PROPERTY_ID_TEMPLATE);
+ operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
+ }
+
+ @Override
+ public String exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return null;
+ }
+ if (template == null || template.length() == 0) {
+ warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+ if (operation == null) {
+ warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ String json = null;
+
+ try {
+ if (reporter != null) {
+ reporter.progress();
+ }
+
+ Map<String, Object> fields = TupleTools.tupleMap(getInputSchema(), tuple);
+ String docId = TupleTools.toString(fields, template);
+
+ // create json
+ json = create(operation, docId, fields, properties);
+ if (json == null || json.length() == 0) {
+ warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+
+ } catch (Exception e) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Caught exception processing input row: \n");
+ sb.append(tuple.toString());
+ sb.append("\nException: ");
+ sb.append(ExceptionUtils.getStackTrace(e));
+ warn(sb.toString(), PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ return json;
+ }
+
+
+ /**
+ * Create a JSON Vespa document operation given the supplied fields,
+ * operation and document id template.
+ *
+ * @param op Operation (put, remove, update)
+ * @param docId Document id
+ * @param fields Fields to put in document operation
+ * @return A valid JSON Vespa document operation
+ * @throws IOException ...
+ */
+ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException {
+ if (op == null) {
+ return null;
+ }
+ if (docId == null || docId.length() == 0) {
+ return null;
+ }
+ if (fields.isEmpty()) {
+ return null;
+ }
+
+ // create json format
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+ g.writeStartObject();
+
+ g.writeStringField(op.toString(), docId);
+
+ if (op != Operation.REMOVE) {
+ writeField("fields", fields, DataType.MAP, g, properties, op, 0);
+ }
+
+ g.writeEndObject();
+ g.close();
+
+ return out.toString();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Operation op, int depth) throws IOException {
+ if (shouldWriteField(name, properties, depth)) {
+ g.writeFieldName(name);
+ if (shouldWritePartialUpdate(op, depth)) {
+ writePartialUpdate(value, type, g, name, properties, op, depth);
+ } else {
+ writeValue(value, type, g, name, properties, op, depth);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ switch (type) {
+ case DataType.UNKNOWN:
+ break;
+ case DataType.NULL:
+ g.writeNull();
+ break;
+ case DataType.BOOLEAN:
+ g.writeBoolean((boolean) value);
+ break;
+ case DataType.INTEGER:
+ g.writeNumber((int) value);
+ break;
+ case DataType.LONG:
+ g.writeNumber((long) value);
+ break;
+ case DataType.FLOAT:
+ g.writeNumber((float) value);
+ break;
+ case DataType.DOUBLE:
+ g.writeNumber((double) value);
+ break;
+ case DataType.DATETIME:
+ g.writeNumber(((DateTime) value).getMillis());
+ break;
+ case DataType.BYTEARRAY:
+ DataByteArray bytes = (DataByteArray) value;
+ String raw = Base64.getEncoder().encodeToString(bytes.get());
+ g.writeString(raw);
+ break;
+ case DataType.CHARARRAY:
+ g.writeString((String) value);
+ break;
+ case DataType.BIGINTEGER:
+ g.writeNumber((BigInteger) value);
+ break;
+ case DataType.BIGDECIMAL:
+ g.writeNumber((BigDecimal) value);
+ break;
+ case DataType.MAP:
+ g.writeStartObject();
+ Map<Object, Object> map = (Map<Object, Object>) value;
+ if (shouldCreateTensor(map, name, properties)) {
+ writeTensor(map, g);
+ } else {
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ Object v = entry.getValue();
+ Byte t = DataType.findType(v);
+ writeField(k, v, t, g, properties, op, depth+1);
+ }
+ }
+ g.writeEndObject();
+ break;
+ case DataType.TUPLE:
+ Tuple tuple = (Tuple) value;
+ boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
+ if (writeStartArray) {
+ g.writeStartArray();
+ }
+ for (Object v : tuple) {
+ writeValue(v, DataType.findType(v), g, name, properties, op, depth);
+ }
+ if (writeStartArray) {
+ g.writeEndArray();
+ }
+ break;
+ case DataType.BAG:
+ DataBag bag = (DataBag) value;
+ g.writeStartArray();
+ for (Tuple t : bag) {
+ writeValue(t, DataType.TUPLE, g, name, properties, op, depth);
+ }
+ g.writeEndArray();
+ break;
+ }
+
+ }
+
+ private static boolean shouldWritePartialUpdate(Operation op, int depth) {
+ return op == Operation.UPDATE && depth == 1;
+ }
+
+ private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ g.writeStartObject();
+ g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation
+ writeValue(value, type, g, name, properties, op, depth);
+ g.writeEndObject();
+ }
+
+ private static boolean shouldWriteTupleStart(Tuple tuple, String name, Properties properties) {
+ if (tuple.size() > 1 || properties == null) {
+ return true;
+ }
+ String simpleArrayFields = properties.getProperty(SIMPLE_ARRAY_FIELDS);
+ if (simpleArrayFields == null) {
+ return true;
+ }
+ if (simpleArrayFields.equals("*")) {
+ return false;
+ }
+ String[] fields = simpleArrayFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS);
+ if (tensorFields == null) {
+ return false;
+ }
+ String[] fields = tensorFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean shouldWriteField(String name, Properties properties, int depth) {
+ if (properties == null || depth != 1) {
+ return true;
+ }
+ String excludeFields = properties.getProperty(EXCLUDE_FIELDS);
+ if (excludeFields == null) {
+ return true;
+ }
+ String[] fields = excludeFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
+ g.writeFieldName("cells");
+ g.writeStartArray();
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ Double v = Double.parseDouble(entry.getValue().toString());
+
+ g.writeStartObject();
+
+ // Write address
+ g.writeFieldName("address");
+ g.writeStartObject();
+
+ String[] dimensions = k.split(",");
+ for (String dimension : dimensions) {
+ if (dimension == null || dimension.isEmpty()) {
+ continue;
+ }
+ String[] address = dimension.split(":");
+ if (address.length != 2) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ String dim = address[0];
+ String label = address[1];
+ if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ g.writeFieldName(dim.trim());
+ g.writeString(label.trim());
+ }
+ g.writeEndObject();
+
+ // Write value
+ g.writeFieldName("value");
+ g.writeNumber(v);
+
+ g.writeEndObject();
+ }
+ g.writeEndArray();
+ }
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
new file mode 100644
index 00000000000..e38e87dda57
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java
@@ -0,0 +1,113 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaQuerySchema;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A Pig UDF to run a query against a Vespa cluster and return the
+ * results.
+ *
+ * @author lesters
+ */
+public class VespaQuery extends EvalFunc<DataBag> {
+
+ private final String PROPERTY_QUERY_TEMPLATE = "query";
+ private final String PROPERTY_QUERY_SCHEMA = "schema";
+ private final String PROPERTY_ROOT_NODE = "rootnode";
+
+ private final VespaConfiguration configuration;
+ private final Properties properties;
+ private final String queryTemplate;
+ private final String querySchema;
+ private final String queryRootNode;
+
+ private VespaHttpClient httpClient;
+
+ public VespaQuery() {
+ this(new String[0]);
+ }
+
+ public VespaQuery(String... params) {
+ configuration = VespaConfiguration.get(UDFContext.getUDFContext().getJobConf(), null);
+ properties = VespaConfiguration.loadProperties(params);
+
+ queryTemplate = properties.getProperty(PROPERTY_QUERY_TEMPLATE);
+ if (queryTemplate == null || queryTemplate.isEmpty()) {
+ throw new IllegalArgumentException("Query template cannot be empty");
+ }
+
+ querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray");
+ queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children");
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ if (input == null || input.size() == 0) {
+ return null;
+ }
+ JsonNode jsonResult = queryVespa(input);
+ if (jsonResult == null) {
+ return null;
+ }
+ return createPigRepresentation(jsonResult);
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return VespaQuerySchema.getPigSchema(querySchema);
+ }
+
+
+ private JsonNode queryVespa(Tuple input) throws IOException {
+ String url = createVespaQueryUrl(input);
+ if (url == null) {
+ return null;
+ }
+ String result = executeVespaQuery(url);
+ return parseVespaResultJson(result);
+ }
+
+
+ private String createVespaQueryUrl(Tuple input) throws IOException {
+ return TupleTools.toString(getInputSchema(), input, queryTemplate);
+ }
+
+
+ private String executeVespaQuery(String url) throws IOException {
+ if (httpClient == null) {
+ httpClient = new VespaHttpClient(configuration);
+ }
+ return httpClient.get(url);
+ }
+
+ private JsonNode parseVespaResultJson(String result) throws IOException {
+ return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode);
+ }
+
+ private DataBag createPigRepresentation(JsonNode hits) {
+ DataBag bag = new SortedDataBag(null);
+ VespaQuerySchema querySchema = new VespaQuerySchema(this.querySchema);
+
+ for (int rank = 0; rank < hits.size(); ++rank) {
+ JsonNode hit = hits.get(rank);
+ Tuple tuple = querySchema.buildTuple(rank, hit);
+ bag.add(tuple);
+ }
+
+ return bag;
+ }
+
+
+
+
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
new file mode 100644
index 00000000000..66f04be657f
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java
@@ -0,0 +1,62 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.VespaSimpleJsonInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import java.io.IOException;
+
+/**
+ * Simple JSON loader which loads either one JSON object per line or a
+ * multiline JSON consisting of objects in an array.
+ *
+ * Returns only the textual representation of the JSON object.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaSimpleJsonLoader extends LoadFunc {
+
+ private TupleFactory tupleFactory = TupleFactory.getInstance();
+ private VespaSimpleJsonInputFormat.VespaJsonRecordReader recordReader;
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ }
+
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ return new VespaSimpleJsonInputFormat();
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+ recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ boolean done = recordReader.nextKeyValue();
+ if (done) {
+ return null;
+ }
+ Text json = recordReader.getCurrentKey();
+ if (json == null) {
+ return null;
+ }
+ return tupleFactory.newTuple(json.toString());
+
+ } catch (InterruptedException ex) {
+ return null;
+ }
+ }
+}
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
new file mode 100644
index 00000000000..d5000b2b328
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
@@ -0,0 +1,187 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+import java.io.*;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A small Pig UDF wrapper around the Vespa http client for
+ * feeding data into a Vespa endpoint.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaStorage extends StoreFunc {
+
+ private final boolean createDocOp;
+ private final String template;
+ private final VespaDocumentOperation.Operation operation;
+
+ private String signature = null;
+ private RecordWriter recordWriter = null;
+ private ResourceSchema resourceSchema = null;
+ private Properties properties = new Properties();
+
+ private static final String PROPERTY_CREATE_DOC_OP = "create-document-operation";
+ private static final String PROPERTY_ID_TEMPLATE = "docid";
+ private static final String PROPERTY_OPERATION = "operation";
+ private static final String PROPERTY_RESOURCE_SCHEMA = "resource_schema";
+
+ public VespaStorage() {
+ createDocOp = false;
+ template = null;
+ operation = null;
+ }
+
+ public VespaStorage(String... params) {
+ properties = VespaConfiguration.loadProperties(params);
+ createDocOp = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_DOC_OP, "false"));
+ operation = VespaDocumentOperation.Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
+ template = properties.getProperty(PROPERTY_ID_TEMPLATE);
+ }
+
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new VespaOutputFormat(properties);
+ }
+
+
+ @Override
+ public void setStoreLocation(String endpoint, Job job) throws IOException {
+ properties.setProperty(VespaConfiguration.ENDPOINT, endpoint);
+ }
+
+
+ @Override
+ public void prepareToWrite(RecordWriter recordWriter) throws IOException {
+ this.recordWriter = recordWriter;
+ this.resourceSchema = getResourceSchema();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return;
+ }
+
+ String data = null;
+ if (createDocOp) {
+ data = createDocumentOperation(tuple);
+ } else if (!tuple.isNull(0)) {
+ data = tuple.get(0).toString(); // assume single field with correctly formatted doc op.
+ }
+
+ if (data == null || data.length() == 0) {
+ return;
+ }
+
+ try {
+ recordWriter.write(0, data);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+ setResourceSchema(resourceSchema);
+ }
+
+
+ @Override
+ public String relToAbsPathForStoreLocation(String endpoint, Path path) throws IOException {
+ return endpoint;
+ }
+
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String s) {
+ this.signature = s;
+ }
+
+
+ @Override
+ public void cleanupOnFailure(String s, Job job) throws IOException {
+ }
+
+
+ @Override
+ public void cleanupOnSuccess(String s, Job job) throws IOException {
+ }
+
+
+ private ResourceSchema getResourceSchema() throws IOException {
+ Properties properties = getUDFProperties();
+ return base64Deserialize(properties.getProperty(PROPERTY_RESOURCE_SCHEMA));
+ }
+
+
+ private void setResourceSchema(ResourceSchema schema) throws IOException {
+ Properties properties = getUDFProperties();
+ if (properties.getProperty(PROPERTY_RESOURCE_SCHEMA) == null) {
+ properties.setProperty(PROPERTY_RESOURCE_SCHEMA, base64Serialize(schema));
+ }
+ }
+
+
+ private Properties getUDFProperties() {
+ String[] context = { signature };
+ return UDFContext.getUDFContext().getUDFProperties(getClass(), context);
+ }
+
+
+ private String createDocumentOperation(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return null;
+ }
+ if (resourceSchema == null) {
+ return null;
+ }
+
+ Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple);
+ String docId = TupleTools.toString(fields, template);
+
+ return VespaDocumentOperation.create(operation, docId, fields, properties);
+ }
+
+
+ public static String base64Serialize(Object o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(o);
+ }
+ return Base64.getEncoder().encodeToString(baos.toByteArray());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Serializable> T base64Deserialize(String s) throws IOException {
+ Object ret;
+ byte[] data = Base64.getDecoder().decode(s);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ ret = ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return (T) ret;
+ }
+
+}