diff options
Diffstat (limited to 'vespa-hadoop/src')
34 files changed, 2930 insertions, 0 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java new file mode 100644 index 00000000000..cd0f65496d0 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputCommitter.java @@ -0,0 +1,36 @@ +package com.yahoo.vespa.hadoop.mapreduce; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * The output committer describes the commit task output for a Map-Reduce + * job. Not currently used, but is part of the Hadoop protocol since 2.7. + * + * @author lesters + */ +public class VespaOutputCommitter extends OutputCommitter { + @Override + public void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + } +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java new file mode 100644 index 00000000000..720a6adf477 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -0,0 +1,51 @@ +package com.yahoo.vespa.hadoop.mapreduce; + +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import org.apache.hadoop.mapreduce.*; + +import java.io.IOException; +import java.util.Properties; + +/** + * An output specification for writing to Vespa instances in a Map-Reduce job. + * Mainly returns an instance of a {@link VespaRecordWriter} that does the + * actual feeding to Vespa. + * + * @author lesters + */ +@SuppressWarnings("rawtypes") +public class VespaOutputFormat extends OutputFormat { + + private final Properties configOverride; + + public VespaOutputFormat() { + super(); + this.configOverride = null; + } + + public VespaOutputFormat(Properties configOverride) { + super(); + this.configOverride = configOverride; + } + + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + VespaCounters counters = VespaCounters.get(context); + VespaConfiguration configuration = VespaConfiguration.get(context.getConfiguration(), configOverride); + return new VespaRecordWriter(configuration, counters); + } + + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return new VespaOutputCommitter(); + } + + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java new file mode 100644 index 00000000000..8072e99e2ab --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -0,0 +1,222 @@ +package com.yahoo.vespa.hadoop.mapreduce; + +import java.io.IOException; +import java.io.StringReader; +import java.util.List; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.logging.Logger; + +import javax.xml.namespace.QName; +import javax.xml.stream.FactoryConfigurationError; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; + +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; +import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedClientFactory; +import com.yahoo.vespa.http.client.Result; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; +import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; +import com.yahoo.vespa.http.client.config.SessionParams; +import org.apache.hadoop.mapreduce.v2.app.job.Task; + +/** + * VespaRecordWriter sends the output <key, value> to one or more Vespa + * endpoints. + * + * @author lesters + */ +@SuppressWarnings("rawtypes") +public class VespaRecordWriter extends RecordWriter { + + private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName()); + + private boolean initialized = false; + private FeedClient feedClient; + + private final VespaCounters counters; + private final VespaConfiguration configuration; + private final int progressInterval; + + VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { + this.counters = counters; + this.configuration = configuration; + this.progressInterval = configuration.progressInterval(); + } + + + @Override + public void write(Object key, Object data) throws IOException, InterruptedException { + if (!initialized) { + initialize(); + } + + String doc = data.toString().trim(); + + // Parse data to find document id - if none found, skip this write + String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) + : findDocIdFromXml(doc); + if (docId != null && docId.length() >= 0) { + feedClient.stream(docId, doc); + counters.incrementDocumentsSent(1); + } else { + counters.incrementDocumentsSkipped(1); + } + + if (counters.getDocumentsSent() % progressInterval == 0) { + String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", + counters.getDocumentsSent(), + counters.getDocumentsOk(), + counters.getDocumentsFailed(), + counters.getDocumentsSkipped()); + log.info(progress); + } + + } + + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + if (feedClient != null) { + feedClient.close(); + } + } + + + private void initialize() { + if (!configuration.dryrun() && configuration.randomSartupSleepMs() > 0) { + int delay = new Random().nextInt(configuration.randomSartupSleepMs()); + log.info("VespaStorage: Delaying startup by " + delay + " ms"); + try { + Thread.sleep(delay); + } catch (Exception e) {} + } + + ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); + connParamsBuilder.setDryRun(configuration.dryrun()); + connParamsBuilder.setUseCompression(configuration.useCompression()); + connParamsBuilder.setEnableV3Protocol(configuration.useV3Protocol()); + connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); + connParamsBuilder.setMaxRetries(configuration.numRetries()); + if (configuration.proxyHost() != null) { + connParamsBuilder.setProxyHost(configuration.proxyHost()); + } + if (configuration.proxyPort() >= 0) { + connParamsBuilder.setProxyPort(configuration.proxyPort()); + } + + FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); + feedParamsBuilder.setDataFormat(configuration.dataFormat()); + feedParamsBuilder.setRoute(configuration.route()); + feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); + feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); + + SessionParams.Builder sessionParams = new SessionParams.Builder(); + sessionParams.setThrottlerMinSize(configuration.throttlerMinSize()); + sessionParams.setConnectionParams(connParamsBuilder.build()); + sessionParams.setFeedParams(feedParamsBuilder.build()); + + String endpoints = configuration.endpoint(); + StringTokenizer tokenizer = new StringTokenizer(endpoints, ","); + while (tokenizer.hasMoreTokens()) { + String endpoint = tokenizer.nextToken().trim(); + sessionParams.addCluster(new Cluster.Builder().addEndpoint( + Endpoint.create(endpoint, 4080, false) + ).build()); + } + + ResultCallback resultCallback = new ResultCallback(counters); + feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback); + + initialized = true; + log.info("VespaStorage configuration:\n" + configuration.toString()); + } + + private String findDocIdFromXml(String xml) { + try { + XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); + while (eventReader.hasNext()) { + XMLEvent event = eventReader.nextEvent(); + if (event.getEventType() == XMLEvent.START_ELEMENT) { + StartElement element = event.asStartElement(); + String elementName = element.getName().getLocalPart(); + if (VespaDocumentOperation.Operation.valid(elementName)) { + return element.getAttributeByName(QName.valueOf("documentid")).getValue(); + } + } + } + } catch (XMLStreamException | FactoryConfigurationError e) { + // as json dude does + return null; + } + return null; + } + + private String findDocId(String json) throws IOException { + JsonFactory factory = new JsonFactory(); + try(JsonParser parser = factory.createParser(json)) { + if (parser.nextToken() != JsonToken.START_OBJECT) { + return null; + } + while (parser.nextToken() != JsonToken.END_OBJECT) { + String fieldName = parser.getCurrentName(); + parser.nextToken(); + if (VespaDocumentOperation.Operation.valid(fieldName)) { + String docId = parser.getText(); + return docId; + } else { + parser.skipChildren(); + } + } + } catch (JsonParseException ex) { + return null; + } + return null; + } + + + static class ResultCallback implements FeedClient.ResultCallback { + final VespaCounters counters; + + public ResultCallback(VespaCounters counters) { + this.counters = counters; + } + + @Override + public void onCompletion(String docId, Result documentResult) { + if (!documentResult.isSuccess()) { + counters.incrementDocumentsFailed(1); + StringBuilder sb = new StringBuilder(); + sb.append("Problems with docid "); + sb.append(docId); + sb.append(": "); + List<Result.Detail> details = documentResult.getDetails(); + for (Result.Detail detail : details) { + sb.append(detail.toString()); + sb.append(" "); + } + log.warning(sb.toString()); + return; + } + counters.incrementDocumentsOk(1); + } + + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java new file mode 100644 index 00000000000..d7bdc592fd5 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java @@ -0,0 +1,98 @@ +package com.yahoo.vespa.hadoop.mapreduce; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import java.io.BufferedInputStream; +import java.io.IOException; + +/** + * Simple JSON reader which splits the input file along JSON object boundaries. + * + * There are two cases handled here: + * 1. Each line contains a JSON object, i.e. { ... } + * 2. The file contains an array of objects with arbitrary line breaks, i.e. [ {...}, {...} ] + * + * Not suitable for cases where you want to extract objects from some other arbitrary structure. + * + * TODO: Support config which points to a array in the JSON as start point for object extraction, + * ala how it is done in VespaHttpClient.parseResultJson, i.e. support rootNode config. + * + * @author lesters + */ +public class VespaSimpleJsonInputFormat extends FileInputFormat<Text, NullWritable> { + + @Override + public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + return new VespaJsonRecordReader(); + } + + public static class VespaJsonRecordReader extends RecordReader<Text, NullWritable> { + private long remaining; + private JsonParser parser; + private Text currentKey; + private NullWritable currentValue = NullWritable.get(); + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + FSDataInputStream stream = FileSystem.get(context.getConfiguration()).open(fileSplit.getPath()); + if (fileSplit.getStart() != 0) { + stream.seek(fileSplit.getStart()); + } + + remaining = fileSplit.getLength(); + + JsonFactory factory = new JsonFactory(); + parser = factory.createParser(new BufferedInputStream(stream)); + parser.setCodec(new ObjectMapper()); + parser.nextToken(); + if (parser.currentToken() == JsonToken.START_ARRAY) { + parser.nextToken(); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (parser.currentToken() != JsonToken.START_OBJECT) { + return true; + } + currentKey = new Text(parser.readValueAsTree().toString()); + parser.nextToken(); + return false; + } + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public NullWritable getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return parser.getCurrentLocation().getByteOffset() / remaining; + } + + @Override + public void close() throws IOException { + parser.close(); + } + } + +} + diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java new file mode 100644 index 00000000000..1d11a3f9fdd --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java @@ -0,0 +1,69 @@ +package com.yahoo.vespa.hadoop.mapreduce.util; + +import org.apache.pig.ResourceSchema; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TupleTools { + + private static final Pattern pattern = Pattern.compile("<([\\w]+)>"); + + public static Map<String, Object> tupleMap(Schema schema, Tuple tuple) throws IOException { + Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1); + List<Schema.FieldSchema> schemas = schema.getFields(); + for (int i = 0; i < schemas.size(); i++) { + Schema.FieldSchema field = schemas.get(i); + String alias = field.alias; + Object value = tuple.get(i); + if (value != null) { + tupleMap.put(alias, value); + } + } + return tupleMap; + } + + public static Map<String, Object> tupleMap(ResourceSchema schema, Tuple tuple) throws IOException { + Map<String, Object> tupleMap = new HashMap<>((int)Math.ceil(tuple.size() / 0.75) + 1); + ResourceSchema.ResourceFieldSchema[] schemas = schema.getFields(); + for (int i = 0; i < schemas.length; i++) { + ResourceSchema.ResourceFieldSchema field = schemas[i]; + String alias = field.getName(); + Object value = tuple.get(i); + if (value != null) { + tupleMap.put(alias, value); + } + } + return tupleMap; + } + + public static String toString(Schema schema, Tuple tuple, String template) throws IOException { + return toString(tupleMap(schema, tuple), template); + } + + public static String toString(Map<String,Object> fields, String template) { + if (template == null || template.length() == 0) { + return template; + } + if (fields == null || fields.size() == 0) { + return template; + } + + Matcher m = pattern.matcher(template); + StringBuffer sb = new StringBuffer(); + while (m.find()) { + Object value = fields.get(m.group(1)); + String replacement = value != null ? value.toString() : m.group(0); + m.appendReplacement(sb, Matcher.quoteReplacement(replacement)); + } + m.appendTail(sb); + return sb.toString(); + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java new file mode 100644 index 00000000000..6e35e5e465a --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -0,0 +1,187 @@ +package com.yahoo.vespa.hadoop.mapreduce.util; + +import com.yahoo.vespa.http.client.config.FeedParams; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; + +public class VespaConfiguration { + + public static final String ENDPOINT = "vespa.feed.endpoint"; + public static final String PROXY_HOST = "vespa.feed.proxy.host"; + public static final String PROXY_PORT = "vespa.feed.proxy.port"; + public static final String DRYRUN = "vespa.feed.dryrun"; + public static final String USE_COMPRESSION = "vespa.feed.usecompression"; + public static final String DATA_FORMAT = "vespa.feed.data.format"; + public static final String PROGRESS_REPORT = "vespa.feed.progress.interval"; + public static final String V3_PROTOCOL = "vespa.feed.v3.protocol"; + public static final String CONNECTIONS = "vespa.feed.connections"; + public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size"; + public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout"; + public static final String ROUTE = "vespa.feed.route"; + public static final String MAX_SLEEP_TIME_MS = "vespa.feed.max.sleep.time.ms"; + public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; + public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; + public static final String NUM_RETRIES = "vespa.feed.num.retries"; + + private final Configuration conf; + private final Properties override; + + private VespaConfiguration(Configuration conf, Properties override) { + this.conf = conf; + this.override = override; + } + + + public static VespaConfiguration get(Configuration conf, Properties override) { + return new VespaConfiguration(conf, override); + } + + + public String endpoint() { + return getString(ENDPOINT); + } + + + public String proxyHost() { + return getString(PROXY_HOST); + } + + + public int proxyPort() { + return getInt(PROXY_PORT, 4080); + } + + + public boolean dryrun() { + return getBoolean(DRYRUN, false); + } + + + public boolean useCompression() { + return getBoolean(USE_COMPRESSION, true); + } + + + public boolean useV3Protocol() { + return getBoolean(V3_PROTOCOL, true); + } + + + public int numConnections() { + return getInt(CONNECTIONS, 2); + } + + + public int throttlerMinSize() { + return getInt(THROTTLER_MIN_SIZE, 0); + } + + + public int queryConnectionTimeout() { + return getInt(QUERY_CONNECTION_TIMEOUT, 10000); + } + + + public String route() { + return getString(ROUTE); + } + + + public int maxSleepTimeMs() { + return getInt(MAX_SLEEP_TIME_MS, 10000); + } + + + public int maxInFlightRequests() { + return getInt(MAX_IN_FLIGHT_REQUESTS, 1000); + } + + + public int randomSartupSleepMs() { + return getInt(RANDOM_STARTUP_SLEEP, 30000); + } + + + public int numRetries() { + return getInt(NUM_RETRIES, 100); + } + + + public FeedParams.DataFormat dataFormat() { + String format = getString(DATA_FORMAT); + if ("xml".equalsIgnoreCase(format)) { + return FeedParams.DataFormat.XML_UTF8; + } + return FeedParams.DataFormat.JSON_UTF8; + } + + + public int progressInterval() { + return getInt(PROGRESS_REPORT, 1000); + } + + + private String getString(String name) { + if (override != null && override.containsKey(name)) { + return override.getProperty(name); + } + return conf != null ? conf.get(name) : null; + } + + + private int getInt(String name, int defaultValue) { + if (override != null && override.containsKey(name)) { + return Integer.parseInt(override.getProperty(name)); + } + return conf != null ? conf.getInt(name, defaultValue) : defaultValue; + } + + + private boolean getBoolean(String name, boolean defaultValue) { + if (override != null && override.containsKey(name)) { + return Boolean.parseBoolean(override.getProperty(name)); + } + return conf != null ? conf.getBoolean(name, defaultValue) : defaultValue; + + } + + public static Properties loadProperties(String... params) { + Properties properties = new Properties(); + if (params != null) { + for (String s : params) { + try { + properties.load(new StringReader(s)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + } + return properties; + } + + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(ENDPOINT + ": " + endpoint() + "\n"); + sb.append(PROXY_HOST + ": " + proxyHost() + "\n"); + sb.append(PROXY_PORT + ": " + proxyPort() + "\n"); + sb.append(DRYRUN + ": " + dryrun() +"\n"); + sb.append(USE_COMPRESSION + ": " + useCompression() +"\n"); + sb.append(DATA_FORMAT + ": " + dataFormat() +"\n"); + sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n"); + sb.append(V3_PROTOCOL + ": " + useV3Protocol() +"\n"); + sb.append(CONNECTIONS + ": " + numConnections() +"\n"); + sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n"); + sb.append(QUERY_CONNECTION_TIMEOUT + ": " + queryConnectionTimeout() +"\n"); + sb.append(ROUTE + ": " + route() +"\n"); + sb.append(MAX_SLEEP_TIME_MS + ": " + maxSleepTimeMs() +"\n"); + sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n"); + sb.append(RANDOM_STARTUP_SLEEP + ": " + randomSartupSleepMs() +"\n"); + sb.append(NUM_RETRIES + ": " + numRetries() +"\n"); + return sb.toString(); + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java new file mode 100644 index 00000000000..dbe47c23814 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaCounters.java @@ -0,0 +1,104 @@ +package com.yahoo.vespa.hadoop.mapreduce.util; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +public class VespaCounters { + + public static final String GROUP = "Vespa Feed Counters"; + public static final String DOCS_OK = "Documents ok"; + public static final String DOCS_SENT = "Documents sent"; + public static final String DOCS_FAILED = "Documents failed"; + public static final String DOCS_SKIPPED = "Documents skipped"; + + private final Counter documentsSent; + private final Counter documentsOk; + private final Counter documentsFailed; + private final Counter documentsSkipped; + + + private VespaCounters(Job job) throws IOException { + Counters counters = job.getCounters(); + documentsSent = counters.findCounter(GROUP, DOCS_SENT); + documentsOk = counters.findCounter(GROUP, DOCS_OK); + documentsFailed = counters.findCounter(GROUP, DOCS_FAILED); + documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED); + } + + + private VespaCounters(TaskAttemptContext context) { + documentsSent = context.getCounter(GROUP, DOCS_SENT); + documentsOk = context.getCounter(GROUP, DOCS_OK); + documentsFailed = context.getCounter(GROUP, DOCS_FAILED); + documentsSkipped = context.getCounter(GROUP, DOCS_SKIPPED); + } + + + private VespaCounters(org.apache.hadoop.mapred.Counters counters) { + documentsSent = counters.findCounter(GROUP, DOCS_SENT); + documentsOk = counters.findCounter(GROUP, DOCS_OK); + documentsFailed = counters.findCounter(GROUP, DOCS_FAILED); + documentsSkipped = counters.findCounter(GROUP, DOCS_SKIPPED); + } + + + public static VespaCounters get(Job job) throws IOException { + return new VespaCounters(job); + } + + + public static VespaCounters get(TaskAttemptContext context) { + return new VespaCounters(context); + } + + + public static VespaCounters get(org.apache.hadoop.mapred.Counters counters) { + return new VespaCounters(counters); + + } + + + public long getDocumentsSent() { + return documentsSent.getValue(); + } + + + public void incrementDocumentsSent(long incr) { + documentsSent.increment(incr); + } + + + public long getDocumentsOk() { + return documentsOk.getValue(); + } + + + public void incrementDocumentsOk(long incr) { + documentsOk.increment(incr); + } + + + public long getDocumentsFailed() { + return documentsFailed.getValue(); + } + + + public void incrementDocumentsFailed(long incr) { + documentsFailed.increment(incr); + } + + + public long getDocumentsSkipped() { + return documentsSkipped.getValue(); + } + + + public void incrementDocumentsSkipped(long incr) { + documentsSkipped.increment(incr); + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java new file mode 100644 index 00000000000..50a089c8b45 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java @@ -0,0 +1,101 @@ +package com.yahoo.vespa.hadoop.mapreduce.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Scanner; + +public class VespaHttpClient { + + private final HttpClient httpClient; + + public VespaHttpClient() { + this(null); + } + + public VespaHttpClient(VespaConfiguration configuration) { + httpClient = createClient(configuration); + } + + public String get(String url) throws IOException { + HttpGet httpGet = new HttpGet(url); + HttpResponse httpResponse = httpClient.execute(httpGet); + + HttpEntity entity = httpResponse.getEntity(); + InputStream is = entity.getContent(); + + String result = ""; + Scanner scanner = new Scanner(is, "UTF-8").useDelimiter("\\A"); + if (scanner.hasNext()) { + result = scanner.next(); + } + EntityUtils.consume(entity); + + if (httpResponse.getStatusLine().getStatusCode() != 200) { + return null; + } + + return result; + } + + public JsonNode parseResultJson(String json, String rootNode) throws IOException { + if (json == null || json.isEmpty()) { + return null; + } + if (rootNode == null || rootNode.isEmpty()) { + return null; + } + + ObjectMapper m = new ObjectMapper(); + JsonNode node = m.readTree(json); + if (node != null) { + String[] path = rootNode.split("/"); + for (String p : path) { + node = node.get(p); + + if (node == null) { + return null; + } + + // if node is an array, return the first node that has the correct path + if (node.isArray()) { + for (int i = 0; i < node.size(); ++i) { + JsonNode n = node.get(i); + if (n.has(p)) { + node = n; + break; + } + } + } + + } + } + return node; + } + + private HttpClient createClient(VespaConfiguration configuration) { + HttpClientBuilder clientBuilder = HttpClientBuilder.create(); + + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + if (configuration != null) { + requestConfigBuilder.setSocketTimeout(configuration.queryConnectionTimeout()); + requestConfigBuilder.setConnectTimeout(configuration.queryConnectionTimeout()); + if (configuration.proxyHost() != null) { + requestConfigBuilder.setProxy(new HttpHost(configuration.proxyHost(), configuration.proxyPort())); + } + } + clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); + return clientBuilder.build(); + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java new file mode 100644 index 00000000000..0208b4165d3 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaQuerySchema.java @@ -0,0 +1,113 @@ +package com.yahoo.vespa.hadoop.mapreduce.util; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.parser.ParserException; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class VespaQuerySchema implements Iterable<VespaQuerySchema.AliasTypePair> { + + private final List<AliasTypePair> tupleSchema = new ArrayList<>(); + + public VespaQuerySchema(String schema) { + for (String e : schema.split(",")) { + String[] pair = e.split(":"); + String alias = pair[0].trim(); + String type = pair[1].trim(); + tupleSchema.add(new AliasTypePair(alias, type)); + } + } + + public Tuple buildTuple(int rank, JsonNode hit) { + Tuple tuple = TupleFactory.getInstance().newTuple(); + + for (VespaQuerySchema.AliasTypePair tupleElement : tupleSchema) { + String alias = tupleElement.getAlias(); + Byte type = DataType.findTypeByName(tupleElement.getType()); + + // reserved word + if ("rank".equals(alias)) { + tuple.append(rank); + } else { + JsonNode field = hit; + String[] path = alias.split("/"); // move outside + for (String p : path) { + field = field.get(p); + if (field == null) { + type = DataType.NULL; // effectively skip field as it is not found + break; + } + } + switch (type) { + case DataType.BOOLEAN: + tuple.append(field.asBoolean()); + break; + case DataType.INTEGER: + tuple.append(field.asInt()); + break; + case DataType.LONG: + tuple.append(field.asLong()); + break; + case DataType.FLOAT: + case DataType.DOUBLE: + tuple.append(field.asDouble()); + break; + case DataType.DATETIME: + tuple.append(field.asText()); + break; + case DataType.CHARARRAY: + tuple.append(field.asText()); + break; + default: + // the rest of the data types are currently not supported + } + } + } + return tuple; + } + + public static Schema getPigSchema(String schemaString) { + Schema schema = null; + schemaString = schemaString.replace("/", "_"); + schemaString = "{(" + schemaString + ")}"; + try { + schema = Utils.getSchemaFromString(schemaString); + } catch (ParserException e) { + e.printStackTrace(); + } + return schema; + } + + @Override + public Iterator<AliasTypePair> iterator() { + return tupleSchema.iterator(); + } + + + public static class AliasTypePair { + private final String alias; + private final String type; + + AliasTypePair(String alias, String type) { + this.alias = alias; + this.type = type; + } + + public String getAlias() { + return alias; + } + + public String getType() { + return type; + } + + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java new file mode 100644 index 00000000000..017ffcdd215 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -0,0 +1,363 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.pig.EvalFunc; +import org.apache.pig.PigWarning; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.joda.time.DateTime; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.*; + +/** + * A Pig UDF to convert simple Pig types into a valid Vespa JSON document format. + * + * @author lesters + */ +public class VespaDocumentOperation extends EvalFunc<String> { + + public enum Operation { + DOCUMENT, + PUT, + ID, + REMOVE, + UPDATE; + + @Override + public String toString() { + return super.toString().toLowerCase(); + } + + public static Operation fromString(String text) { + for (Operation op : Operation.values()) { + if (op.toString().equalsIgnoreCase(text)) { + return op; + } + } + throw new IllegalArgumentException("Unknown operation: " + text); + } + + public static boolean valid(String text) { + for (Operation op : Operation.values()) { + if (op.toString().equalsIgnoreCase(text)) { + return true; + } + } + return false; + } + + } + + private static final String PROPERTY_ID_TEMPLATE = "docid"; + private static final String PROPERTY_OPERATION = "operation"; + private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields"; + private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields"; + private static final String EXCLUDE_FIELDS = "exclude-fields"; + + private static final String PARTIAL_UPDATE_ASSIGN = "assign"; + + private final String template; + private final Operation operation; + private final Properties properties; + + public VespaDocumentOperation(String... params) { + properties = VespaConfiguration.loadProperties(params); + template = properties.getProperty(PROPERTY_ID_TEMPLATE); + operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); + } + + @Override + public String exec(Tuple tuple) throws IOException { + if (tuple == null || tuple.size() == 0) { + return null; + } + if (template == null || template.length() == 0) { + warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1); + return null; + } + if (operation == null) { + warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1); + return null; + } + + String json = null; + + try { + if (reporter != null) { + reporter.progress(); + } + + Map<String, Object> fields = TupleTools.tupleMap(getInputSchema(), tuple); + String docId = TupleTools.toString(fields, template); + + // create json + json = create(operation, docId, fields, properties); + if (json == null || json.length() == 0) { + warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1); + return null; + } + + + } catch (Exception e) { + StringBuilder sb = new StringBuilder(); + sb.append("Caught exception processing input row: \n"); + sb.append(tuple.toString()); + sb.append("\nException: "); + sb.append(ExceptionUtils.getStackTrace(e)); + warn(sb.toString(), PigWarning.UDF_WARNING_1); + return null; + } + + return json; + } + + + /** + * Create a JSON Vespa document operation given the supplied fields, + * operation and document id template. + * + * @param op Operation (put, remove, update) + * @param docId Document id + * @param fields Fields to put in document operation + * @return A valid JSON Vespa document operation + * @throws IOException ... + */ + public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException { + if (op == null) { + return null; + } + if (docId == null || docId.length() == 0) { + return null; + } + if (fields.isEmpty()) { + return null; + } + + // create json format + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); + g.writeStartObject(); + + g.writeStringField(op.toString(), docId); + + if (op != Operation.REMOVE) { + writeField("fields", fields, DataType.MAP, g, properties, op, 0); + } + + g.writeEndObject(); + g.close(); + + return out.toString(); + } + + + @SuppressWarnings("unchecked") + private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Operation op, int depth) throws IOException { + if (shouldWriteField(name, properties, depth)) { + g.writeFieldName(name); + if (shouldWritePartialUpdate(op, depth)) { + writePartialUpdate(value, type, g, name, properties, op, depth); + } else { + writeValue(value, type, g, name, properties, op, depth); + } + } + } + + @SuppressWarnings("unchecked") + private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException { + switch (type) { + case DataType.UNKNOWN: + break; + case DataType.NULL: + g.writeNull(); + break; + case DataType.BOOLEAN: + g.writeBoolean((boolean) value); + break; + case DataType.INTEGER: + g.writeNumber((int) value); + break; + case DataType.LONG: + g.writeNumber((long) value); + break; + case DataType.FLOAT: + g.writeNumber((float) value); + break; + case DataType.DOUBLE: + g.writeNumber((double) value); + break; + case DataType.DATETIME: + g.writeNumber(((DateTime) value).getMillis()); + break; + case DataType.BYTEARRAY: + DataByteArray bytes = (DataByteArray) value; + String raw = Base64.getEncoder().encodeToString(bytes.get()); + g.writeString(raw); + break; + case DataType.CHARARRAY: + g.writeString((String) value); + break; + case DataType.BIGINTEGER: + g.writeNumber((BigInteger) value); + break; + case DataType.BIGDECIMAL: + g.writeNumber((BigDecimal) value); + break; + case DataType.MAP: + g.writeStartObject(); + Map<Object, Object> map = (Map<Object, Object>) value; + if (shouldCreateTensor(map, name, properties)) { + writeTensor(map, g); + } else { + for (Map.Entry<Object, Object> entry : map.entrySet()) { + String k = entry.getKey().toString(); + Object v = entry.getValue(); + Byte t = DataType.findType(v); + writeField(k, v, t, g, properties, op, depth+1); + } + } + g.writeEndObject(); + break; + case DataType.TUPLE: + Tuple tuple = (Tuple) value; + boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties); + if (writeStartArray) { + g.writeStartArray(); + } + for (Object v : tuple) { + writeValue(v, DataType.findType(v), g, name, properties, op, depth); + } + if (writeStartArray) { + g.writeEndArray(); + } + break; + case DataType.BAG: + DataBag bag = (DataBag) value; + g.writeStartArray(); + for (Tuple t : bag) { + writeValue(t, DataType.TUPLE, g, name, properties, op, depth); + } + g.writeEndArray(); + break; + } + + } + + private static boolean shouldWritePartialUpdate(Operation op, int depth) { + return op == Operation.UPDATE && depth == 1; + } + + private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException { + g.writeStartObject(); + g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation + writeValue(value, type, g, name, properties, op, depth); + g.writeEndObject(); + } + + private static boolean shouldWriteTupleStart(Tuple tuple, String name, Properties properties) { + if (tuple.size() > 1 || properties == null) { + return true; + } + String simpleArrayFields = properties.getProperty(SIMPLE_ARRAY_FIELDS); + if (simpleArrayFields == null) { + return true; + } + if (simpleArrayFields.equals("*")) { + return false; + } + String[] fields = simpleArrayFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return false; + } + } + return true; + } + + private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) { + if (properties == null) { + return false; + } + String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); + if (tensorFields == null) { + return false; + } + String[] fields = tensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + return false; + } + + private static boolean shouldWriteField(String name, Properties properties, int depth) { + if (properties == null || depth != 1) { + return true; + } + String excludeFields = properties.getProperty(EXCLUDE_FIELDS); + if (excludeFields == null) { + return true; + } + String[] fields = excludeFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return false; + } + } + return true; + } + + private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException { + g.writeFieldName("cells"); + g.writeStartArray(); + for (Map.Entry<Object, Object> entry : map.entrySet()) { + String k = entry.getKey().toString(); + Double v = Double.parseDouble(entry.getValue().toString()); + + g.writeStartObject(); + + // Write address + g.writeFieldName("address"); + g.writeStartObject(); + + String[] dimensions = k.split(","); + for (String dimension : dimensions) { + if (dimension == null || dimension.isEmpty()) { + continue; + } + String[] address = dimension.split(":"); + if (address.length != 2) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + String dim = address[0]; + String label = address[1]; + if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + g.writeFieldName(dim.trim()); + g.writeString(label.trim()); + } + g.writeEndObject(); + + // Write value + g.writeFieldName("value"); + g.writeNumber(v); + + g.writeEndObject(); + } + g.writeEndArray(); + } + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java new file mode 100644 index 00000000000..e38e87dda57 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java @@ -0,0 +1,113 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.fasterxml.jackson.databind.JsonNode; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaQuerySchema; +import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; +import org.apache.pig.EvalFunc; +import org.apache.pig.data.*; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.UDFContext; + +import java.io.IOException; +import java.util.*; + +/** + * A Pig UDF to run a query against a Vespa cluster and return the + * results. + * + * @author lesters + */ +public class VespaQuery extends EvalFunc<DataBag> { + + private final String PROPERTY_QUERY_TEMPLATE = "query"; + private final String PROPERTY_QUERY_SCHEMA = "schema"; + private final String PROPERTY_ROOT_NODE = "rootnode"; + + private final VespaConfiguration configuration; + private final Properties properties; + private final String queryTemplate; + private final String querySchema; + private final String queryRootNode; + + private VespaHttpClient httpClient; + + public VespaQuery() { + this(new String[0]); + } + + public VespaQuery(String... params) { + configuration = VespaConfiguration.get(UDFContext.getUDFContext().getJobConf(), null); + properties = VespaConfiguration.loadProperties(params); + + queryTemplate = properties.getProperty(PROPERTY_QUERY_TEMPLATE); + if (queryTemplate == null || queryTemplate.isEmpty()) { + throw new IllegalArgumentException("Query template cannot be empty"); + } + + querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray"); + queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children"); + } + + @Override + public DataBag exec(Tuple input) throws IOException { + if (input == null || input.size() == 0) { + return null; + } + JsonNode jsonResult = queryVespa(input); + if (jsonResult == null) { + return null; + } + return createPigRepresentation(jsonResult); + } + + @Override + public Schema outputSchema(Schema input) { + return VespaQuerySchema.getPigSchema(querySchema); + } + + + private JsonNode queryVespa(Tuple input) throws IOException { + String url = createVespaQueryUrl(input); + if (url == null) { + return null; + } + String result = executeVespaQuery(url); + return parseVespaResultJson(result); + } + + + private String createVespaQueryUrl(Tuple input) throws IOException { + return TupleTools.toString(getInputSchema(), input, queryTemplate); + } + + + private String executeVespaQuery(String url) throws IOException { + if (httpClient == null) { + httpClient = new VespaHttpClient(configuration); + } + return httpClient.get(url); + } + + private JsonNode parseVespaResultJson(String result) throws IOException { + return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode); + } + + private DataBag createPigRepresentation(JsonNode hits) { + DataBag bag = new SortedDataBag(null); + VespaQuerySchema querySchema = new VespaQuerySchema(this.querySchema); + + for (int rank = 0; rank < hits.size(); ++rank) { + JsonNode hit = hits.get(rank); + Tuple tuple = querySchema.buildTuple(rank, hit); + bag.add(tuple); + } + + return bag; + } + + + + +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java new file mode 100644 index 00000000000..66f04be657f --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java @@ -0,0 +1,62 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.yahoo.vespa.hadoop.mapreduce.VespaSimpleJsonInputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; + +import java.io.IOException; + +/** + * Simple JSON loader which loads either one JSON object per line or a + * multiline JSON consisting of objects in an array. + * + * Returns only the textual representation of the JSON object. + * + * @author lesters + */ +@SuppressWarnings("rawtypes") +public class VespaSimpleJsonLoader extends LoadFunc { + + private TupleFactory tupleFactory = TupleFactory.getInstance(); + private VespaSimpleJsonInputFormat.VespaJsonRecordReader recordReader; + + @Override + public void setLocation(String location, Job job) throws IOException { + FileInputFormat.setInputPaths(job, location); + } + + @Override + public InputFormat getInputFormat() throws IOException { + return new VespaSimpleJsonInputFormat(); + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { + recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader; + } + + @Override + public Tuple getNext() throws IOException { + try { + boolean done = recordReader.nextKeyValue(); + if (done) { + return null; + } + Text json = recordReader.getCurrentKey(); + if (json == null) { + return null; + } + return tupleFactory.newTuple(json.toString()); + + } catch (InterruptedException ex) { + return null; + } + } +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java new file mode 100644 index 00000000000..d5000b2b328 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java @@ -0,0 +1,187 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; +import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.pig.ResourceSchema; +import org.apache.pig.StoreFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; + +import java.io.*; +import java.util.Base64; +import java.util.Map; +import java.util.Properties; + +/** + * A small Pig UDF wrapper around the Vespa http client for + * feeding data into a Vespa endpoint. + * + * @author lesters + */ +@SuppressWarnings("rawtypes") +public class VespaStorage extends StoreFunc { + + private final boolean createDocOp; + private final String template; + private final VespaDocumentOperation.Operation operation; + + private String signature = null; + private RecordWriter recordWriter = null; + private ResourceSchema resourceSchema = null; + private Properties properties = new Properties(); + + private static final String PROPERTY_CREATE_DOC_OP = "create-document-operation"; + private static final String PROPERTY_ID_TEMPLATE = "docid"; + private static final String PROPERTY_OPERATION = "operation"; + private static final String PROPERTY_RESOURCE_SCHEMA = "resource_schema"; + + public VespaStorage() { + createDocOp = false; + template = null; + operation = null; + } + + public VespaStorage(String... params) { + properties = VespaConfiguration.loadProperties(params); + createDocOp = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_DOC_OP, "false")); + operation = VespaDocumentOperation.Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); + template = properties.getProperty(PROPERTY_ID_TEMPLATE); + } + + + @Override + public OutputFormat getOutputFormat() throws IOException { + return new VespaOutputFormat(properties); + } + + + @Override + public void setStoreLocation(String endpoint, Job job) throws IOException { + properties.setProperty(VespaConfiguration.ENDPOINT, endpoint); + } + + + @Override + public void prepareToWrite(RecordWriter recordWriter) throws IOException { + this.recordWriter = recordWriter; + this.resourceSchema = getResourceSchema(); + } + + + @SuppressWarnings("unchecked") + @Override + public void putNext(Tuple tuple) throws IOException { + if (tuple == null || tuple.size() == 0) { + return; + } + + String data = null; + if (createDocOp) { + data = createDocumentOperation(tuple); + } else if (!tuple.isNull(0)) { + data = tuple.get(0).toString(); // assume single field with correctly formatted doc op. + } + + if (data == null || data.length() == 0) { + return; + } + + try { + recordWriter.write(0, data); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + + @Override + public void checkSchema(ResourceSchema resourceSchema) throws IOException { + setResourceSchema(resourceSchema); + } + + + @Override + public String relToAbsPathForStoreLocation(String endpoint, Path path) throws IOException { + return endpoint; + } + + + @Override + public void setStoreFuncUDFContextSignature(String s) { + this.signature = s; + } + + + @Override + public void cleanupOnFailure(String s, Job job) throws IOException { + } + + + @Override + public void cleanupOnSuccess(String s, Job job) throws IOException { + } + + + private ResourceSchema getResourceSchema() throws IOException { + Properties properties = getUDFProperties(); + return base64Deserialize(properties.getProperty(PROPERTY_RESOURCE_SCHEMA)); + } + + + private void setResourceSchema(ResourceSchema schema) throws IOException { + Properties properties = getUDFProperties(); + if (properties.getProperty(PROPERTY_RESOURCE_SCHEMA) == null) { + properties.setProperty(PROPERTY_RESOURCE_SCHEMA, base64Serialize(schema)); + } + } + + + private Properties getUDFProperties() { + String[] context = { signature }; + return UDFContext.getUDFContext().getUDFProperties(getClass(), context); + } + + + private String createDocumentOperation(Tuple tuple) throws IOException { + if (tuple == null || tuple.size() == 0) { + return null; + } + if (resourceSchema == null) { + return null; + } + + Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple); + String docId = TupleTools.toString(fields, template); + + return VespaDocumentOperation.create(operation, docId, fields, properties); + } + + + public static String base64Serialize(Object o) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(o); + } + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + + @SuppressWarnings("unchecked") + public static <T extends Serializable> T base64Deserialize(String s) throws IOException { + Object ret; + byte[] data = Base64.getDecoder().decode(s); + ByteArrayInputStream bais = new ByteArrayInputStream(data); + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + ret = ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + return (T) ret; + } + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java new file mode 100644 index 00000000000..a79f63a77ce --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java @@ -0,0 +1,197 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.test.PathUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MapReduceTest { + + protected static File hdfsBaseDir; + protected static FileSystem hdfs; + protected static Configuration conf; + protected static MiniDFSCluster cluster; + + protected static Path metricsJsonPath; + protected static Path metricsCsvPath; + + @BeforeClass + public static void setUp() throws IOException { + hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath()); + + conf = new HdfsConfiguration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath()); + conf.set(VespaConfiguration.DRYRUN, "true"); + conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun"); + + cluster = new MiniDFSCluster.Builder(conf).build(); + hdfs = FileSystem.get(conf); + + metricsJsonPath = new Path("metrics_json"); + metricsCsvPath = new Path("metrics_csv"); + copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data"); + copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data"); + } + + @AfterClass + public static void tearDown() throws IOException { + Path testDir = new Path(hdfsBaseDir.getParent()); + hdfs.delete(testDir, true); + cluster.shutdown(); + LocalFileSystem localFileSystem = FileSystem.getLocal(conf); + localFileSystem.delete(testDir, true); + } + + @Test + public void requireThatMapOnlyJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(FeedMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + + FileInputFormat.setInputPaths(job, metricsJsonPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + + @Test + public void requireThatMapReduceJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(FeedMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(FeedReducer.class); + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, metricsJsonPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + + + @Test + public void requireThatTransformMapJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(ParsingMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(FeedReducer.class); + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, metricsCsvPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + assertEquals(0, counters.getDocumentsSkipped()); + } + + + private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException { + Path hdfsPath = new Path(hdfsDir, hdfsName); + FSDataOutputStream out = hdfs.create(hdfsPath); + + try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) { + int len; + byte[] buffer = new byte[1024]; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } finally { + out.close(); + } + } + + public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> { + public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String line = value.toString(); + if (line == null || line.length() == 0) + return; + + StringTokenizer tokenizer = new StringTokenizer(line); + long date = Long.parseLong(tokenizer.nextToken()); + String metricName = tokenizer.nextToken(); + long metricValue = Long.parseLong(tokenizer.nextToken()); + String application = tokenizer.nextToken(); + + String docid = "id:"+application+":metric::"+metricName+"-"+date; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); + + g.writeStartObject(); + g.writeObjectFieldStart("fields"); + g.writeNumberField("date", date); + g.writeStringField("name", metricName); + g.writeNumberField("value", metricValue); + g.writeStringField("application", application); + g.writeEndObject(); + g.writeStringField("put", docid); + g.writeEndObject(); + g.close(); + + context.write(key, new Text(out.toString())); + } + } + + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java new file mode 100644 index 00000000000..0e1a8fba17c --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -0,0 +1,274 @@ +package com.yahoo.vespa.hadoop.pig; + +import org.apache.pig.data.*; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +@SuppressWarnings("serial") +public class VespaDocumentOperationTest { + + @Test + public void requireThatUDFReturnsCorrectJson() throws Exception { + String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.path("fields"); + + // operation put is default + assertEquals("id:testapp:metrics::clicks-20160112", root.get("put").getTextValue()); + assertEquals("testapp", fields.get("application").getTextValue()); + assertEquals("clicks", fields.get("name").getTextValue()); + assertEquals(3, fields.get("value").getIntValue()); + } + + + @Test + public void requireThatUDFSupportsUpdateAssign() throws IOException { + String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.path("fields"); + + assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").getTextValue()); + assertEquals("testapp", fields.get("application").get("assign").getTextValue()); + assertEquals("clicks", fields.get("name").get("assign").getTextValue()); + assertEquals(3, fields.get("value").get("assign").getIntValue()); + } + + + @Test + public void requireThatUDFReturnsNullForMissingConfig() throws Exception { + String json = getDocumentOperationJson(); + assertNull(json); + } + + + @Test + public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception { + String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + + assertEquals("id:testapp:metrics::clicks-20160112", root.get("remove").getTextValue()); + assertNull(fields); + } + + + @Test + public void requireThatUDFGeneratesComplexDataTypes() throws Exception { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + Tuple intTuple = TupleFactory.getInstance().newTuple(); + int[] intArray = {1, 2, 3}; + for (int i : intArray) { intTuple.append(i); } + + Tuple stringTuple = TupleFactory.getInstance().newTuple(); + String[] stringArray = {"a", "b", "c"}; + for (String s : stringArray) { stringTuple.append(s); } + + DataBag bag = new SortedDataBag(null); + bag.add(intTuple); + bag.add(stringTuple); + + Map<String, Object> innerMap = new HashMap<String, Object>() {{ + put("a", "string"); + put("tuple", intTuple); + }}; + + DataByteArray bytes = new DataByteArray("testdata".getBytes()); + + Map<String, Object> outerMap = new HashMap<String, Object>() {{ + put("string", "value"); + put("int", 3); + put("float", 3.145); + put("bool", true); + put("byte", bytes); + put("map", innerMap); + put("bag", bag); + }}; + + addToTuple("map", DataType.MAP, outerMap, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode map = fields.get("map"); + + assertEquals("value", map.get("string").getTextValue()); + assertEquals(3, map.get("int").getIntValue()); + assertEquals(3.145, map.get("float").getDoubleValue(), 1e-6); + assertEquals(true, map.get("bool").getBooleanValue()); + assertEquals("dGVzdGRhdGE=", map.get("byte").getTextValue()); + + assertEquals("string", map.get("map").get("a").getTextValue()); + for (int i = 0; i < intArray.length; ++i) { + assertEquals(intArray[i], map.get("map").get("tuple").get(i).asInt()); + } + + JsonNode bagField = map.get("bag"); + for (int i = 0; i < intArray.length; ++i) { + assertEquals(intArray[i], bagField.get(0).get(i).asInt()); + } + for (int i = 0; i < stringArray.length; ++i) { + assertEquals(stringArray[i], bagField.get(1).get(i).asText()); + } + } + + + @Test + public void requireThatSimpleArraysMustBeConfigured() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty"); // simple arrays not configured + // json: [["a"], ["b"], ["c"]] + assertEquals("a", array.get(0).get(0).asText()); + assertEquals("b", array.get(1).get(0).asText()); + assertEquals("c", array.get(2).get(0).asText()); + } + + + @Test + public void requireThatSimpleArraysAreSupported() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=array"); + // json: ["a", "b", "c"] + assertEquals("a", array.get(0).asText()); + assertEquals("b", array.get(1).asText()); + assertEquals("c", array.get(2).asText()); + } + + + @Test + public void requireThatSimpleArraysCanBeConfiguredWithWildcard() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=*"); + // json: ["a", "b", "c"] + assertEquals("a", array.get(0).asText()); + assertEquals("b", array.get(1).asText()); + assertEquals("c", array.get(2).asText()); + } + + + @Test + public void requireThatMultipleSimpleArraysAreSupported() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=empty,array"); + // json: ["a", "b", "c"] + assertEquals("a", array.get(0).asText()); + assertEquals("b", array.get(1).asText()); + assertEquals("c", array.get(2).asText()); + } + + + private JsonNode setupSimpleArrayOperation(String name, String[] array, String... params) throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + DataBag bag = new SortedDataBag(null); + for (String s : array) { + Tuple stringTuple = TupleFactory.getInstance().newTuple(); + stringTuple.append(s); + bag.add(stringTuple); + } + addToTuple(name, DataType.BAG, bag, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation(params); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + return fields.get(name); + } + + + @Test + public void requireThatUDFSupportsTensors() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + // Please refer to the tensor format documentation + + Map<String, Double> tensor = new HashMap<String, Double>() {{ + put("x:label1,y:label2,z:label4", 2.0); + put("x:label3", 3.0); + }}; + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("tensor", DataType.MAP, tensor, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode tensorNode = fields.get("tensor"); + JsonNode cells = tensorNode.get("cells"); + + assertEquals("label1", cells.get(0).get("address").get("x").asText()); + assertEquals("label2", cells.get(0).get("address").get("y").asText()); + assertEquals("label4", cells.get(0).get("address").get("z").asText()); + assertEquals("label3", cells.get(1).get("address").get("x").asText()); + + assertEquals(2.0, cells.get(0).get("value").asDouble(), 1e-6); + assertEquals(3.0, cells.get(1).get("value").asDouble(), 1e-6); + } + + + @Test + public void requireThatUDFCanExcludeFields() throws IOException { + String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "exclude-fields=application,date"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.path("fields"); + + // 'application' and 'date' fields should not appear in JSON + assertNull(fields.get("application")); + assertNull(fields.get("date")); + assertNotNull(fields.get("name")); + assertNotNull(fields.get("value")); + } + + + private String getDocumentOperationJson(String... params) throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + addToTuple("application", DataType.CHARARRAY, "testapp", schema, tuple); + addToTuple("name", DataType.CHARARRAY, "clicks", schema, tuple); + addToTuple("date", DataType.CHARARRAY, "20160112", schema, tuple); + addToTuple("value", DataType.CHARARRAY, 3, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation(params); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + return json; + } + + + private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) { + schema.add(new Schema.FieldSchema(alias, type)); + tuple.append(value); + } + + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java new file mode 100644 index 00000000000..0f123904cfb --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java @@ -0,0 +1,96 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.sun.net.httpserver.HttpServer; +import com.yahoo.vespa.hadoop.util.MockQueryHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class VespaQueryTest { + + @Test + public void requireThatQueriesAreReturnedCorrectly() throws Exception { + runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901); + } + + @Test + public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception { + runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902); + } + + private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception { + final String endpoint = "http://localhost:" + port; + + HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); + server.createContext("/", queryHandler); + server.start(); + + PigServer ps = setup(script, endpoint); + + Iterator<Tuple> recommendations = ps.openIterator("recommendations"); + while (recommendations.hasNext()) { + Tuple tuple = recommendations.next(); + + String userid = (String) tuple.get(0); + Integer rank = (Integer) tuple.get(1); + String docid = (String) tuple.get(2); + Double relevance = (Double) tuple.get(3); + String fieldId = (String) tuple.get(4); + String fieldContent = (String) tuple.get(5); + + MockQueryHandler.MockQueryHit hit = queryHandler.getHit(userid, rank); + assertEquals(docid, hit.id); + assertEquals(relevance, hit.relevance, 1e-3); + assertEquals(fieldId, hit.fieldId); + assertEquals(fieldContent, hit.fieldContent); + } + + if (server != null) { + server.stop(0); + } + + } + + private PigServer setup(String script, String endpoint) throws Exception { + Configuration conf = new HdfsConfiguration(); + Map<String, String> parameters = new HashMap<>(); + parameters.put("ENDPOINT", endpoint); + + PigServer ps = new PigServer(ExecType.LOCAL, conf); + ps.setBatchOn(); + ps.registerScript(script, parameters); + + return ps; + } + + private MockQueryHandler createQueryHandler(String childNode) { + MockQueryHandler queryHandler = new MockQueryHandler(childNode); + + List<String> userIds = Arrays.asList("5", "104", "313"); + + int hitsPerUser = 3; + for (int i = 0; i < hitsPerUser * userIds.size(); ++i) { + String id = "" + (i+1); + String userId = userIds.get(i / hitsPerUser); + queryHandler.newHit(). + setId("id::::" + id). + setRelevance(1.0 - (i % hitsPerUser) * 0.1). + setFieldSddocname("doctype"). + setFieldId("" + id). + setFieldDate("2016060" + id). + setFieldContent("Content for user " + userId + " hit " + i % hitsPerUser + "..."). + add(userId); + } + + return queryHandler; + } + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java new file mode 100644 index 00000000000..322c729b8c5 --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -0,0 +1,110 @@ +package com.yahoo.vespa.hadoop.pig; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.mapred.Counters; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; +import org.junit.Test; + +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; + + +public class VespaStorageTest { + + @Test + public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(VespaConfiguration.DATA_FORMAT, "xml"); + assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf); + } + + + @Test + public void requireThatPremadeOperationsFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_operations.pig"); + } + + + @Test + public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig"); + } + + + @Test + public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); + } + + + @Test + public void requireThatCreateOperationsFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_create_operations.pig"); + } + + + @Test + public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig"); + } + + + @Test + public void requireThatFeedVisitDataSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_visit_data.pig"); + } + + + private PigServer setup(String script, Configuration conf) throws Exception { + if (conf == null) { + conf = new HdfsConfiguration(); + } + conf.setIfUnset(VespaConfiguration.DRYRUN, "true"); + conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint"); + + // Parameter substitutions - can also be set by configuration + Map<String, String> parameters = new HashMap<>(); + parameters.put("ENDPOINT", "endpoint-does-not-matter-in-dryrun,another-endpoint-that-does-not-matter"); + + PigServer ps = new PigServer(ExecType.LOCAL, conf); + ps.setBatchOn(); + ps.registerScript(script, parameters); + + return ps; + } + + + private void assertAllDocumentsOk(String script) throws Exception { + assertAllDocumentsOk(script, null); + } + + + private void assertAllDocumentsOk(String script, Configuration conf) throws Exception { + PigServer ps = setup(script, conf); + List<ExecJob> jobs = ps.executeBatch(); + PigStats stats = jobs.get(0).getStatistics(); + for (JobStats js : stats.getJobGraph()) { + Counters hadoopCounters = ((MRJobStats)js).getHadoopCounters(); + assertNotNull(hadoopCounters); + VespaCounters counters = VespaCounters.get(hadoopCounters); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + } + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java new file mode 100644 index 00000000000..0bf9f6b447e --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java @@ -0,0 +1,218 @@ +package com.yahoo.vespa.hadoop.util; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockQueryHandler implements HttpHandler { + + private final Map<String, List<MockQueryHit>> hitMap; + private final String childNode; + + public MockQueryHandler(String childNode) { + this.hitMap = new HashMap<>(); + this.childNode = childNode; + } + + public void handle(HttpExchange t) throws IOException { + URI uri = t.getRequestURI(); + String query = uri.getQuery(); + String response = null; + + // Parse query - extract "query" element + if (query != null) { + String params[] = query.split("[&]"); + for (String param : params) { + int i = param.indexOf('='); + String name = param.substring(0, i); + String value = URLDecoder.decode(param.substring(i + 1), "UTF-8"); + + if ("query".equalsIgnoreCase(name)) { + response = getResponse(URLDecoder.decode(param.substring(i + 1), "UTF-8")); + } + } + } + + t.sendResponseHeaders(200, response == null ? 0 : response.length()); + OutputStream os = t.getResponseBody(); + os.write(response == null ? "".getBytes() : response.getBytes()); + os.close(); + + } + + public MockQueryHit getHit(String query, Integer rank) { + if (!hitMap.containsKey(query)) { + return null; + } + if (rank >= hitMap.get(query).size()) { + return null; + } + return hitMap.get(query).get(rank); + } + + public MockQueryHit newHit() { + return new MockQueryHit(this); + } + + public void addHit(String query, MockQueryHit hit) { + if (!hitMap.containsKey(query)) { + hitMap.put(query, new ArrayList<>()); + } + hitMap.get(query).add(hit); + } + + private String getResponse(String query) throws IOException { + List<MockQueryHit> hits = hitMap.get(query); + if (hits == null) { + return null; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); + + writeResultStart(g, hits.size()); + for (MockQueryHit hit : hits) { + writeHit(g, hit); + } + writeResultsEnd(g); + g.close(); + + return out.toString(); + } + + private void writeHit(JsonGenerator g, MockQueryHit hit) throws IOException { + g.writeStartObject(); + + g.writeFieldName("id"); + g.writeString(hit.id); + + g.writeFieldName("relevance"); + g.writeNumber(hit.relevance); + + g.writeFieldName("fields"); + g.writeStartObject(); + + g.writeFieldName("sddocname"); + g.writeString(hit.fieldSddocname); + + g.writeFieldName("date"); + g.writeString(hit.fieldDate); + + g.writeFieldName("content"); + g.writeString(hit.fieldContent); + + g.writeFieldName("id"); + g.writeString(hit.fieldId); + + g.writeEndObject(); + g.writeEndObject(); + } + + private void writeResultStart(JsonGenerator g, int count) throws IOException { + g.writeStartObject(); + g.writeFieldName("root"); + + g.writeStartObject(); + + g.writeFieldName("id"); + g.writeString("toplevel"); + + g.writeFieldName("relevance"); + g.writeNumber(1); + + g.writeFieldName("fields"); + g.writeStartObject(); + g.writeFieldName("totalCount"); + g.writeNumber(count); + g.writeEndObject(); + + g.writeFieldName("coverage"); + g.writeStartObject(); + g.writeFieldName("coverage"); + g.writeNumber(100); + // ... more stuff here usually + g.writeEndObject(); + + g.writeFieldName("children"); + g.writeStartArray(); + + if (!childNode.isEmpty()) { + g.writeStartObject(); + g.writeFieldName(childNode); + g.writeStartArray(); + } + } + + private void writeResultsEnd(JsonGenerator g) throws IOException { + if (!childNode.isEmpty()) { + g.writeEndArray(); + g.writeEndObject(); + } + g.writeEndArray(); + g.writeEndObject(); + g.writeEndObject(); + } + + public static class MockQueryHit { + + private final MockQueryHandler handler; + + public String id; + public Double relevance; + public String fieldSddocname; + public String fieldDate; + public String fieldContent; + public String fieldId; + + private MockQueryHit(MockQueryHandler handler) { + this.handler = handler; + } + + public void add(String query) { + handler.addHit(query, this); + } + + public MockQueryHit setId(String id) { + this.id = id; + return this; + } + + public MockQueryHit setRelevance(Double relevance) { + this.relevance = relevance; + return this; + } + + public MockQueryHit setFieldSddocname(String fieldSddocname) { + this.fieldSddocname = fieldSddocname; + return this; + } + + public MockQueryHit setFieldDate(String fieldDate) { + this.fieldDate = fieldDate; + return this; + } + + public MockQueryHit setFieldContent(String fieldContent) { + this.fieldContent = fieldContent; + return this; + } + + public MockQueryHit setFieldId(String fieldId) { + this.fieldId = fieldId; + return this; + } + } + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java new file mode 100644 index 00000000000..c98e7b1c02c --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java @@ -0,0 +1,50 @@ +package com.yahoo.vespa.hadoop.util; + +import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TupleToolsTest { + + @Test + public void requireThatTupleToStringHandlesSimpleTypes() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("rank", DataType.INTEGER, 1, schema, tuple); + + String template = "Id is <id> and rank is <rank>"; + String result = TupleTools.toString(schema, tuple, template); + + assertEquals("Id is 123 and rank is 1", result); + } + + + private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) { + schema.add(new Schema.FieldSchema(alias, type)); + tuple.append(value); + } + + @Test + public void requireThatTupleToStringHandlesStringCharacters() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple); + addToTuple("rank", DataType.INTEGER, 1, schema, tuple); + + String template = "Id is <id> and rank is <rank>"; + String result = TupleTools.toString(schema, tuple, template); + + assertEquals("Id is _!@#$%^&*() and rank is 1", result); + } + +} diff --git a/vespa-hadoop/src/test/pig/feed_create_operations.pig b/vespa-hadoop/src/test/pig/feed_create_operations.pig new file mode 100644 index 00000000000..2186935b59a --- /dev/null +++ b/vespa-hadoop/src/test/pig/feed_create_operations.pig @@ -0,0 +1,23 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Create valid Vespa put operations +DEFINE VespaPutOperation + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:<application>:metrics::<name>-<date>' + ); + +-- By default, VespaStorage assumes it's feeding valid Vespa operations +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load tabular data +metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray); + +-- Transform tabular data to a Vespa document operation JSON format +metrics = FOREACH metrics GENERATE VespaPutOperation(*); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage(); + + diff --git a/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig b/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig new file mode 100644 index 00000000000..348616f00ad --- /dev/null +++ b/vespa-hadoop/src/test/pig/feed_create_operations_short_form.pig @@ -0,0 +1,18 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Transform tabular data to a Vespa document operation JSON format +-- as part of storing the data. +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage( + 'create-document-operation=true', + 'operation=put', + 'docid=id:<application>:metrics::<name>-<date>' + ); + +-- Load tabular data +metrics = LOAD 'src/test/resources/tabular_data.csv' AS (date:chararray, name:chararray, value:int, application:chararray); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage(); + + diff --git a/vespa-hadoop/src/test/pig/feed_multiline_operations.pig b/vespa-hadoop/src/test/pig/feed_multiline_operations.pig new file mode 100644 index 00000000000..e9efb36858b --- /dev/null +++ b/vespa-hadoop/src/test/pig/feed_multiline_operations.pig @@ -0,0 +1,14 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaJsonLoader +DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader(); + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for json data +metrics = LOAD 'src/test/resources/operations_multiline_data.json' USING VespaJsonLoader() AS (data:chararray); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage(); + diff --git a/vespa-hadoop/src/test/pig/feed_operations.pig b/vespa-hadoop/src/test/pig/feed_operations.pig new file mode 100644 index 00000000000..327181d4410 --- /dev/null +++ b/vespa-hadoop/src/test/pig/feed_operations.pig @@ -0,0 +1,10 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for json data +metrics = LOAD 'src/test/resources/operations_data.json' AS (data:chararray); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage();
\ No newline at end of file diff --git a/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig b/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig new file mode 100644 index 00000000000..6d31201e4eb --- /dev/null +++ b/vespa-hadoop/src/test/pig/feed_operations_with_json_loader.pig @@ -0,0 +1,13 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaJsonLoader +DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader(); + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for json data +metrics = LOAD 'src/test/resources/operations_data.json' USING VespaJsonLoader() AS (data:chararray); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage();
\ No newline at end of file diff --git a/vespa-hadoop/src/test/pig/feed_operations_xml.pig b/vespa-hadoop/src/test/pig/feed_operations_xml.pig new file mode 100644 index 00000000000..d109d56ad1e --- /dev/null +++ b/vespa-hadoop/src/test/pig/feed_operations_xml.pig @@ -0,0 +1,10 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for xml data +data = LOAD 'src/test/resources/operations_data.xml' AS (data:chararray); + +-- Store into Vespa +STORE data INTO '$ENDPOINT' USING VespaStorage();
\ No newline at end of file diff --git a/vespa-hadoop/src/test/pig/feed_visit_data.pig b/vespa-hadoop/src/test/pig/feed_visit_data.pig new file mode 100644 index 00000000000..14010c38336 --- /dev/null +++ b/vespa-hadoop/src/test/pig/feed_visit_data.pig @@ -0,0 +1,11 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for json data +metrics = LOAD 'src/test/resources/visit_data.json' AS (data:chararray); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage(); + diff --git a/vespa-hadoop/src/test/pig/query.pig b/vespa-hadoop/src/test/pig/query.pig new file mode 100644 index 00000000000..70f53a992e2 --- /dev/null +++ b/vespa-hadoop/src/test/pig/query.pig @@ -0,0 +1,18 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define Vespa query for retrieving blog posts +DEFINE BlogPostRecommendations + com.yahoo.vespa.hadoop.pig.VespaQuery( + 'query=$ENDPOINT/search?query=<userid>&hits=100', + 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray' + ); + +-- Load data from a local file +users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray); +users = FILTER users BY userid IS NOT null; + +-- Run a set of queries against Vespa +recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*)); + +-- Output recommendations +DUMP recommendations; diff --git a/vespa-hadoop/src/test/pig/query_alt_root.pig b/vespa-hadoop/src/test/pig/query_alt_root.pig new file mode 100644 index 00000000000..8995990e398 --- /dev/null +++ b/vespa-hadoop/src/test/pig/query_alt_root.pig @@ -0,0 +1,19 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define Vespa query for retrieving blog posts +DEFINE BlogPostRecommendations + com.yahoo.vespa.hadoop.pig.VespaQuery( + 'query=$ENDPOINT/search?query=<userid>&hits=100', + 'rootnode=root/children/children', + 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray' + ); + +-- Load data from a local file +users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray); +users = FILTER users BY userid IS NOT null; + +-- Run a set of queries against Vespa +recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*)); + +-- Output recommendations +DUMP recommendations; diff --git a/vespa-hadoop/src/test/resources/operations_data.json b/vespa-hadoop/src/test/resources/operations_data.json new file mode 100644 index 00000000000..5af436dbfe7 --- /dev/null +++ b/vespa-hadoop/src/test/resources/operations_data.json @@ -0,0 +1,10 @@ +{"put":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}} +{"fields":{"date":"2015110416","name":"clicks","value":5,"application":"testapp"},"put":"id:testapp:metric::clicks-2015110416"} +{"put":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}} +{"put":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}} +{"put":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}} +{"put":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}} +{"put":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}} +{"put":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}} +{"fields":{"date":"2015110422","name":"clicks","value":5,"application":"testapp"},"condition":"metrics==0","put":"id:testapp:metric::clicks-2015110422"} +{"put":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}} diff --git a/vespa-hadoop/src/test/resources/operations_data.xml b/vespa-hadoop/src/test/resources/operations_data.xml new file mode 100644 index 00000000000..cdf1ca78c1d --- /dev/null +++ b/vespa-hadoop/src/test/resources/operations_data.xml @@ -0,0 +1,13 @@ +<?xml version="1.0" encoding="utf-8"?> +<vespafeed> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/a-ha/Scoundrel+Days"> <url>http://music.yahoo.com/a-ha/Scoundrel+Days</url> <title><![CDATA[Scoundrel Days]]></title> <artist><![CDATA[a-ha]]></artist> <year>0</year> <popularity>290</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Restless+And+Wild"> <url>http://music.yahoo.com/Accept/Restless+And+Wild</url> <title><![CDATA[Restless And Wild]]></title> <artist><![CDATA[Accept]]></artist> <year>0</year> <popularity>75</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Staying+A+Life"> <url>http://music.yahoo.com/Accept/Staying+A+Life</url> <title><![CDATA[Staying A Life]]></title> <artist><![CDATA[Accept]]></artist> <year>1985</year> <popularity>77</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Dirt"> <url>http://music.yahoo.com/Alice+In+Chains/Dirt</url> <title><![CDATA[Dirt]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1992</year> <popularity>114</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Live"> <url>http://music.yahoo.com/Alice+In+Chains/Live</url> <title><![CDATA[Live]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1990</year> <popularity>363</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life"> <url>http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life</url> <title><![CDATA[This Is The Life]]></title> <artist><![CDATA[Amy MacDonald]]></artist> <year>2007</year> <popularity>355</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Ane+Brun/Duets"> <url>http://music.yahoo.com/Ane+Brun/Duets</url> <title><![CDATA[Duets]]></title> <artist><![CDATA[Ane Brun]]></artist> <year>0</year> <popularity>255</popularity> </document> + <update documenttype="music" documentid="id:music:music::http://music.yahoo.com/bobdylan/BestOf"><assign field="title">The Best of Bob Dylan</assign><add field="tracks"><item>Man Of Constant Sorrow</item></add></update> + <remove documentid="id:music:music::http://music.yahoo.com/Aqpop/Beautifully+Smart" /> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Annuals/Be+He+Me"> <url>http://music.yahoo.com/Annuals/Be+He+Me</url> <title><![CDATA[Be He Me]]></title> <artist><![CDATA[Annuals]]></artist> <year>0</year> <popularity>207</popularity> </document> +</vespafeed> diff --git a/vespa-hadoop/src/test/resources/operations_multiline_data.json b/vespa-hadoop/src/test/resources/operations_multiline_data.json new file mode 100644 index 00000000000..2b51698d9b7 --- /dev/null +++ b/vespa-hadoop/src/test/resources/operations_multiline_data.json @@ -0,0 +1,93 @@ +[ + { + "put": "id:testapp:metric::clicks-2015110414", + "fields": { + "date": "2015110414", + "name": "clicks", + "value": 1, + "application": "testapp" + } + }, + { + "fields": { + "date": "2015110416", + "name": "clicks", + "value": 5, + "application": "testapp" + }, + "put": "id:testapp:metric::clicks-2015110416" + }, + { + "put": "id:testapp:metric::clicks-2015110415", + "fields": { + "date": "2015110415", + "name": "clicks", + "value": 2, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110417", + "fields": { + "date": "2015110417", + "name": "clicks", + "value": 3, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110418", + "fields": { + "date": "2015110418", + "name": "clicks", + "value": 6, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110419", + "fields": { + "date": "2015110419", + "name": "clicks", + "value": 3, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110420", + "fields": { + "date": "2015110420", + "name": "clicks", + "value": 4, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110421", + "fields": { + "date": "2015110421", + "name": "clicks", + "value": 2, + "application": "testapp" + } + }, + { + "fields": { + "date": "2015110422", + "name": "clicks", + "value": 5, + "application": "testapp" + }, + "condition": "metrics==0", + "put": "id:testapp:metric::clicks-2015110422" + }, + { + "put": "id:testapp:metric::clicks-2015110423", + "fields": { + "date": "2015110423", + "name": "clicks", + "value": 1, + "application": "testapp" + } + } +] diff --git a/vespa-hadoop/src/test/resources/tabular_data.csv b/vespa-hadoop/src/test/resources/tabular_data.csv new file mode 100644 index 00000000000..541597998e9 --- /dev/null +++ b/vespa-hadoop/src/test/resources/tabular_data.csv @@ -0,0 +1,11 @@ +2015110414 clicks 1 testapp +2015110415 clicks 2 testapp +2015110416 clicks 5 testapp +2015110417 clicks 3 testapp +2015110418 clicks 6 testapp +2015110419 clicks 3 testapp +2015110420 clicks 4 testapp +2015110421 clicks 2 testapp +2015110422 clicks 5 testapp +2015110423 clicks 1 testapp + diff --git a/vespa-hadoop/src/test/resources/user_ids.csv b/vespa-hadoop/src/test/resources/user_ids.csv new file mode 100644 index 00000000000..5875a3b9a7c --- /dev/null +++ b/vespa-hadoop/src/test/resources/user_ids.csv @@ -0,0 +1,4 @@ +5 +104 +313 + diff --git a/vespa-hadoop/src/test/resources/visit_data.json b/vespa-hadoop/src/test/resources/visit_data.json new file mode 100644 index 00000000000..a48fc9cf1c0 --- /dev/null +++ b/vespa-hadoop/src/test/resources/visit_data.json @@ -0,0 +1,12 @@ +[ +{"id":"id:testapp:metric::clicks-2015110414","fields":{"date":"2015110414","name":"clicks","value":1,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110415","fields":{"date":"2015110415","name":"clicks","value":2,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110416","fields":{"date":"2015110416","name":"clicks","value":4,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110417","fields":{"date":"2015110417","name":"clicks","value":3,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110418","fields":{"date":"2015110418","name":"clicks","value":6,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110419","fields":{"date":"2015110419","name":"clicks","value":3,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110420","fields":{"date":"2015110420","name":"clicks","value":4,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110421","fields":{"date":"2015110421","name":"clicks","value":2,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110422","fields":{"date":"2015110422","name":"clicks","value":7,"application":"testapp"}}, +{"id":"id:testapp:metric::clicks-2015110423","fields":{"date":"2015110423","name":"clicks","value":1,"application":"testapp"}} +] |