diff options
author | Kristian Aune <kkraune@users.noreply.github.com> | 2017-03-17 09:45:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-03-17 09:45:12 +0100 |
commit | b09d7deb64e5a723c5a052dd2b1db225f632405f (patch) | |
tree | f9a240cc40b06e0662175a9101a56cf8edd6780b | |
parent | 772090db5b9a3bf4dfa34d1385e3c549bee8fa95 (diff) | |
parent | 8bbdd803f7bd67d51b8dc7f0abc906f793b42520 (diff) |
Merge pull request #2023 from yahoo/lesters/move-vespa-hadoop
Move vespa-hadoop to Vespa repository
24 files changed, 600 insertions, 74 deletions
diff --git a/hadoop/README b/hadoop/README index 66ce7113337..1b567b88c1d 100644 --- a/hadoop/README +++ b/hadoop/README @@ -1,7 +1,4 @@ -The grid feeding client. +The Vespa Hadoop client. -TODO: the code here is currently not the code that is built and distributed. -That is in https://git.corp.yahoo.com/vespa/vespa-hadoop. Soon, that code will -be moved here as it has a close relationship to the Java feed client. Also, -system tests will be added. +Contains APIs for feeding and querying Vespa from the grid. diff --git a/hadoop/pom.xml b/hadoop/pom.xml index e12b9cbd86a..f23b73abc54 100644 --- a/hadoop/pom.xml +++ b/hadoop/pom.xml @@ -15,11 +15,12 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <hadoop.version>2.6.0</hadoop.version> + <hadoop.version>2.7.3</hadoop.version> <pig.version>0.14.0</pig.version> </properties> <dependencies> + <!-- Hadoop dependencies --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> @@ -40,6 +41,33 @@ <scope>provided</scope> </dependency> + <!-- These are inherited from parent. Needed for correct versions on Hadoop. --> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>2.5.0</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.4.1</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.1</version> + <scope>compile</scope> + </dependency> + + <!-- Test dependencies --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> @@ -53,6 +81,7 @@ <scope>test</scope> </dependency> + <!-- Vespa feeding dependencies --> <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>vespa-http-client</artifactId> @@ -66,10 +95,12 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> <configuration> - <source>1.8</source> - <target>1.8</target> + <showDeprecation>true</showDeprecation> + <compilerArgs> + <arg>-Xlint:all</arg> + <arg>-Werror</arg> + </compilerArgs> </configuration> </plugin> diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java index 5181ef05e2c..720a6adf477 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaOutputFormat.java @@ -14,6 +14,7 @@ import java.util.Properties; * * @author lesters */ +@SuppressWarnings("rawtypes") public class VespaOutputFormat extends OutputFormat { private final Properties configOverride; @@ -45,7 +46,6 @@ public class VespaOutputFormat extends OutputFormat { @Override public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { - } } diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 015d79cd177..8072e99e2ab 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -1,5 +1,23 @@ package com.yahoo.vespa.hadoop.mapreduce; +import java.io.IOException; +import java.io.StringReader; +import java.util.List; +import java.util.Random; +import java.util.StringTokenizer; +import java.util.logging.Logger; + +import javax.xml.namespace.QName; +import javax.xml.stream.FactoryConfigurationError; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; + +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; @@ -10,14 +28,13 @@ import com.yahoo.vespa.hadoop.pig.VespaDocumentOperation; import com.yahoo.vespa.http.client.FeedClient; import com.yahoo.vespa.http.client.FeedClientFactory; import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.*; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.util.List; -import java.util.StringTokenizer; -import java.util.logging.Logger; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; +import com.yahoo.vespa.http.client.config.FeedParams.DataFormat; +import com.yahoo.vespa.http.client.config.SessionParams; +import org.apache.hadoop.mapreduce.v2.app.job.Task; /** * VespaRecordWriter sends the output <key, value> to one or more Vespa @@ -25,6 +42,7 @@ import java.util.logging.Logger; * * @author lesters */ +@SuppressWarnings("rawtypes") public class VespaRecordWriter extends RecordWriter { private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName()); @@ -36,7 +54,6 @@ public class VespaRecordWriter extends RecordWriter { private final VespaConfiguration configuration; private final int progressInterval; - VespaRecordWriter(VespaConfiguration configuration, VespaCounters counters) { this.counters = counters; this.configuration = configuration; @@ -50,13 +67,13 @@ public class VespaRecordWriter extends RecordWriter { initialize(); } - // Assumption: json - xml not currently supported - String json = data.toString().trim(); + String doc = data.toString().trim(); - // Parse json to find document id - if none found, skip this write - String docId = findDocId(json); + // Parse data to find document id - if none found, skip this write + String docId = DataFormat.JSON_UTF8.equals(configuration.dataFormat()) ? findDocId(doc) + : findDocIdFromXml(doc); if (docId != null && docId.length() >= 0) { - feedClient.stream(docId, json); + feedClient.stream(docId, doc); counters.incrementDocumentsSent(1); } else { counters.incrementDocumentsSkipped(1); @@ -83,11 +100,20 @@ public class VespaRecordWriter extends RecordWriter { private void initialize() { + if (!configuration.dryrun() && configuration.randomSartupSleepMs() > 0) { + int delay = new Random().nextInt(configuration.randomSartupSleepMs()); + log.info("VespaStorage: Delaying startup by " + delay + " ms"); + try { + Thread.sleep(delay); + } catch (Exception e) {} + } + ConnectionParams.Builder connParamsBuilder = new ConnectionParams.Builder(); connParamsBuilder.setDryRun(configuration.dryrun()); connParamsBuilder.setUseCompression(configuration.useCompression()); connParamsBuilder.setEnableV3Protocol(configuration.useV3Protocol()); connParamsBuilder.setNumPersistentConnectionsPerEndpoint(configuration.numConnections()); + connParamsBuilder.setMaxRetries(configuration.numRetries()); if (configuration.proxyHost() != null) { connParamsBuilder.setProxyHost(configuration.proxyHost()); } @@ -98,6 +124,8 @@ public class VespaRecordWriter extends RecordWriter { FeedParams.Builder feedParamsBuilder = new FeedParams.Builder(); feedParamsBuilder.setDataFormat(configuration.dataFormat()); feedParamsBuilder.setRoute(configuration.route()); + feedParamsBuilder.setMaxSleepTimeMs(configuration.maxSleepTimeMs()); + feedParamsBuilder.setMaxInFlightRequests(configuration.maxInFlightRequests()); SessionParams.Builder sessionParams = new SessionParams.Builder(); sessionParams.setThrottlerMinSize(configuration.throttlerMinSize()); @@ -117,9 +145,29 @@ public class VespaRecordWriter extends RecordWriter { feedClient = FeedClientFactory.create(sessionParams.build(), resultCallback); initialized = true; + log.info("VespaStorage configuration:\n" + configuration.toString()); } - + private String findDocIdFromXml(String xml) { + try { + XMLEventReader eventReader = XMLInputFactory.newInstance().createXMLEventReader(new StringReader(xml)); + while (eventReader.hasNext()) { + XMLEvent event = eventReader.nextEvent(); + if (event.getEventType() == XMLEvent.START_ELEMENT) { + StartElement element = event.asStartElement(); + String elementName = element.getName().getLocalPart(); + if (VespaDocumentOperation.Operation.valid(elementName)) { + return element.getAttributeByName(QName.valueOf("documentid")).getValue(); + } + } + } + } catch (XMLStreamException | FactoryConfigurationError e) { + // as json dude does + return null; + } + return null; + } + private String findDocId(String json) throws IOException { JsonFactory factory = new JsonFactory(); try(JsonParser parser = factory.createParser(json)) { diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java new file mode 100644 index 00000000000..d7bdc592fd5 --- /dev/null +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaSimpleJsonInputFormat.java @@ -0,0 +1,98 @@ +package com.yahoo.vespa.hadoop.mapreduce; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import java.io.BufferedInputStream; +import java.io.IOException; + +/** + * Simple JSON reader which splits the input file along JSON object boundaries. + * + * There are two cases handled here: + * 1. Each line contains a JSON object, i.e. { ... } + * 2. The file contains an array of objects with arbitrary line breaks, i.e. [ {...}, {...} ] + * + * Not suitable for cases where you want to extract objects from some other arbitrary structure. + * + * TODO: Support config which points to a array in the JSON as start point for object extraction, + * ala how it is done in VespaHttpClient.parseResultJson, i.e. support rootNode config. + * + * @author lesters + */ +public class VespaSimpleJsonInputFormat extends FileInputFormat<Text, NullWritable> { + + @Override + public RecordReader<Text, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + return new VespaJsonRecordReader(); + } + + public static class VespaJsonRecordReader extends RecordReader<Text, NullWritable> { + private long remaining; + private JsonParser parser; + private Text currentKey; + private NullWritable currentValue = NullWritable.get(); + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + FSDataInputStream stream = FileSystem.get(context.getConfiguration()).open(fileSplit.getPath()); + if (fileSplit.getStart() != 0) { + stream.seek(fileSplit.getStart()); + } + + remaining = fileSplit.getLength(); + + JsonFactory factory = new JsonFactory(); + parser = factory.createParser(new BufferedInputStream(stream)); + parser.setCodec(new ObjectMapper()); + parser.nextToken(); + if (parser.currentToken() == JsonToken.START_ARRAY) { + parser.nextToken(); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (parser.currentToken() != JsonToken.START_OBJECT) { + return true; + } + currentKey = new Text(parser.readValueAsTree().toString()); + parser.nextToken(); + return false; + } + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public NullWritable getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return parser.getCurrentLocation().getByteOffset() / remaining; + } + + @Override + public void close() throws IOException { + parser.close(); + } + } + +} + diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java index cd692c69ba7..1d11a3f9fdd 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/TupleTools.java @@ -60,7 +60,7 @@ public class TupleTools { while (m.find()) { Object value = fields.get(m.group(1)); String replacement = value != null ? value.toString() : m.group(0); - m.appendReplacement(sb, replacement); + m.appendReplacement(sb, Matcher.quoteReplacement(replacement)); } m.appendTail(sb); return sb.toString(); diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java index f11b09894d0..6e35e5e465a 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaConfiguration.java @@ -21,6 +21,10 @@ public class VespaConfiguration { public static final String THROTTLER_MIN_SIZE = "vespa.feed.throttler.min.size"; public static final String QUERY_CONNECTION_TIMEOUT = "vespa.query.connection.timeout"; public static final String ROUTE = "vespa.feed.route"; + public static final String MAX_SLEEP_TIME_MS = "vespa.feed.max.sleep.time.ms"; + public static final String MAX_IN_FLIGHT_REQUESTS = "vespa.feed.max.in.flight.requests"; + public static final String RANDOM_STARTUP_SLEEP = "vespa.feed.random.startup.sleep.ms"; + public static final String NUM_RETRIES = "vespa.feed.num.retries"; private final Configuration conf; private final Properties override; @@ -62,12 +66,12 @@ public class VespaConfiguration { public boolean useV3Protocol() { - return getBoolean(V3_PROTOCOL, false); + return getBoolean(V3_PROTOCOL, true); } public int numConnections() { - return getInt(CONNECTIONS, 8); + return getInt(CONNECTIONS, 2); } @@ -80,11 +84,32 @@ public class VespaConfiguration { return getInt(QUERY_CONNECTION_TIMEOUT, 10000); } + public String route() { return getString(ROUTE); } + public int maxSleepTimeMs() { + return getInt(MAX_SLEEP_TIME_MS, 10000); + } + + + public int maxInFlightRequests() { + return getInt(MAX_IN_FLIGHT_REQUESTS, 1000); + } + + + public int randomSartupSleepMs() { + return getInt(RANDOM_STARTUP_SLEEP, 30000); + } + + + public int numRetries() { + return getInt(NUM_RETRIES, 100); + } + + public FeedParams.DataFormat dataFormat() { String format = getString(DATA_FORMAT); if ("xml".equalsIgnoreCase(format)) { @@ -93,6 +118,7 @@ public class VespaConfiguration { return FeedParams.DataFormat.JSON_UTF8; } + public int progressInterval() { return getInt(PROGRESS_REPORT, 1000); } @@ -136,4 +162,26 @@ public class VespaConfiguration { return properties; } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(ENDPOINT + ": " + endpoint() + "\n"); + sb.append(PROXY_HOST + ": " + proxyHost() + "\n"); + sb.append(PROXY_PORT + ": " + proxyPort() + "\n"); + sb.append(DRYRUN + ": " + dryrun() +"\n"); + sb.append(USE_COMPRESSION + ": " + useCompression() +"\n"); + sb.append(DATA_FORMAT + ": " + dataFormat() +"\n"); + sb.append(PROGRESS_REPORT + ": " + progressInterval() +"\n"); + sb.append(V3_PROTOCOL + ": " + useV3Protocol() +"\n"); + sb.append(CONNECTIONS + ": " + numConnections() +"\n"); + sb.append(THROTTLER_MIN_SIZE + ": " + throttlerMinSize() +"\n"); + sb.append(QUERY_CONNECTION_TIMEOUT + ": " + queryConnectionTimeout() +"\n"); + sb.append(ROUTE + ": " + route() +"\n"); + sb.append(MAX_SLEEP_TIME_MS + ": " + maxSleepTimeMs() +"\n"); + sb.append(MAX_IN_FLIGHT_REQUESTS + ": " + maxInFlightRequests() +"\n"); + sb.append(RANDOM_STARTUP_SLEEP + ": " + randomSartupSleepMs() +"\n"); + sb.append(NUM_RETRIES + ": " + numRetries() +"\n"); + return sb.toString(); + } + } diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java index e63a4044c95..50a089c8b45 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/util/VespaHttpClient.java @@ -24,12 +24,13 @@ public class VespaHttpClient { } public VespaHttpClient(VespaConfiguration configuration) { - httpClient = createClient(configuration); + httpClient = createClient(configuration); } public String get(String url) throws IOException { HttpGet httpGet = new HttpGet(url); HttpResponse httpResponse = httpClient.execute(httpGet); + HttpEntity entity = httpResponse.getEntity(); InputStream is = entity.getContent(); @@ -47,16 +48,36 @@ public class VespaHttpClient { return result; } - public JsonNode parseResultJson(String json) throws IOException { + public JsonNode parseResultJson(String json, String rootNode) throws IOException { if (json == null || json.isEmpty()) { return null; } + if (rootNode == null || rootNode.isEmpty()) { + return null; + } + ObjectMapper m = new ObjectMapper(); JsonNode node = m.readTree(json); if (node != null) { - node = node.get("root"); - if (node != null) { - node = node.get("children"); + String[] path = rootNode.split("/"); + for (String p : path) { + node = node.get(p); + + if (node == null) { + return null; + } + + // if node is an array, return the first node that has the correct path + if (node.isArray()) { + for (int i = 0; i < node.size(); ++i) { + JsonNode n = node.get(i); + if (n.has(p)) { + node = n; + break; + } + } + } + } } return node; diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index 20e9af222f5..017ffcdd215 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.EvalFunc; import org.apache.pig.PigWarning; import org.apache.pig.data.DataBag; @@ -27,6 +28,7 @@ import java.util.*; public class VespaDocumentOperation extends EvalFunc<String> { public enum Operation { + DOCUMENT, PUT, ID, REMOVE, @@ -108,7 +110,13 @@ public class VespaDocumentOperation extends EvalFunc<String> { } catch (Exception e) { - throw new IOException("Caught exception processing input row ", e); + StringBuilder sb = new StringBuilder(); + sb.append("Caught exception processing input row: \n"); + sb.append(tuple.toString()); + sb.append("\nException: "); + sb.append(ExceptionUtils.getStackTrace(e)); + warn(sb.toString(), PigWarning.UDF_WARNING_1); + return null; } return json; @@ -123,7 +131,7 @@ public class VespaDocumentOperation extends EvalFunc<String> { * @param docId Document id * @param fields Fields to put in document operation * @return A valid JSON Vespa document operation - * @throws IOException + * @throws IOException ... */ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException { if (op == null) { diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java index 5faa6da94bd..e38e87dda57 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaQuery.java @@ -23,16 +23,18 @@ public class VespaQuery extends EvalFunc<DataBag> { private final String PROPERTY_QUERY_TEMPLATE = "query"; private final String PROPERTY_QUERY_SCHEMA = "schema"; + private final String PROPERTY_ROOT_NODE = "rootnode"; private final VespaConfiguration configuration; private final Properties properties; private final String queryTemplate; private final String querySchema; + private final String queryRootNode; private VespaHttpClient httpClient; public VespaQuery() { - this(null); + this(new String[0]); } public VespaQuery(String... params) { @@ -45,7 +47,7 @@ public class VespaQuery extends EvalFunc<DataBag> { } querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray"); - + queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children"); } @Override @@ -89,7 +91,7 @@ public class VespaQuery extends EvalFunc<DataBag> { } private JsonNode parseVespaResultJson(String result) throws IOException { - return httpClient == null ? null : httpClient.parseResultJson(result); + return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode); } private DataBag createPigRepresentation(JsonNode hits) { diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java new file mode 100644 index 00000000000..66f04be657f --- /dev/null +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaSimpleJsonLoader.java @@ -0,0 +1,62 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.yahoo.vespa.hadoop.mapreduce.VespaSimpleJsonInputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; + +import java.io.IOException; + +/** + * Simple JSON loader which loads either one JSON object per line or a + * multiline JSON consisting of objects in an array. + * + * Returns only the textual representation of the JSON object. + * + * @author lesters + */ +@SuppressWarnings("rawtypes") +public class VespaSimpleJsonLoader extends LoadFunc { + + private TupleFactory tupleFactory = TupleFactory.getInstance(); + private VespaSimpleJsonInputFormat.VespaJsonRecordReader recordReader; + + @Override + public void setLocation(String location, Job job) throws IOException { + FileInputFormat.setInputPaths(job, location); + } + + @Override + public InputFormat getInputFormat() throws IOException { + return new VespaSimpleJsonInputFormat(); + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { + recordReader = (VespaSimpleJsonInputFormat.VespaJsonRecordReader) reader; + } + + @Override + public Tuple getNext() throws IOException { + try { + boolean done = recordReader.nextKeyValue(); + if (done) { + return null; + } + Text json = recordReader.getCurrentKey(); + if (json == null) { + return null; + } + return tupleFactory.newTuple(json.toString()); + + } catch (InterruptedException ex) { + return null; + } + } +} diff --git a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java index 4caa382223b..d5000b2b328 100644 --- a/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java +++ b/hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java @@ -23,6 +23,7 @@ import java.util.Properties; * * @author lesters */ +@SuppressWarnings("rawtypes") public class VespaStorage extends StoreFunc { private final boolean createDocOp; diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java index 7c44b54efcb..a79f63a77ce 100644 --- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java +++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java @@ -16,6 +16,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java index 5e2bf9b887e..0e1a8fba17c 100644 --- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -14,6 +14,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +@SuppressWarnings("serial") public class VespaDocumentOperationTest { @Test diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java index 9ba212293d0..0f123904cfb 100644 --- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java +++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java @@ -1,49 +1,39 @@ package com.yahoo.vespa.hadoop.pig; import com.sun.net.httpserver.HttpServer; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient; import com.yahoo.vespa.hadoop.util.MockQueryHandler; -import junit.framework.Assert; -import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.util.EntityUtils; import org.apache.pig.ExecType; import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.data.Tuple; import org.junit.Test; -import java.io.IOException; -import java.io.InputStream; import java.net.InetSocketAddress; import java.util.*; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class VespaQueryTest { @Test public void requireThatQueriesAreReturnedCorrectly() throws Exception { - MockQueryHandler queryHandler = createQueryHandler(); + runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901); + } + + @Test + public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception { + runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902); + } - final int port = 18901; + private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception { final String endpoint = "http://localhost:" + port; HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); server.createContext("/", queryHandler); server.start(); - PigServer ps = setup("src/test/pig/query.pig", endpoint); + PigServer ps = setup(script, endpoint); Iterator<Tuple> recommendations = ps.openIterator("recommendations"); while (recommendations.hasNext()) { @@ -81,8 +71,8 @@ public class VespaQueryTest { return ps; } - private MockQueryHandler createQueryHandler() { - MockQueryHandler queryHandler = new MockQueryHandler(); + private MockQueryHandler createQueryHandler(String childNode) { + MockQueryHandler queryHandler = new MockQueryHandler(childNode); List<String> userIds = Arrays.asList("5", "104", "313"); diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index 33756b3ad31..322c729b8c5 100644 --- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -1,7 +1,13 @@ package com.yahoo.vespa.hadoop.pig; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.mapred.Counters; @@ -11,26 +17,41 @@ import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; - import org.junit.Test; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; public class VespaStorageTest { @Test + public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(VespaConfiguration.DATA_FORMAT, "xml"); + assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf); + } + + + @Test public void requireThatPremadeOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_operations.pig"); } @Test + public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig"); + } + + + @Test + public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); + } + + + @Test public void requireThatCreateOperationsFeedSucceeds() throws Exception { assertAllDocumentsOk("src/test/pig/feed_create_operations.pig"); } @@ -47,10 +68,13 @@ public class VespaStorageTest { assertAllDocumentsOk("src/test/pig/feed_visit_data.pig"); } - private PigServer setup(String script) throws Exception { - Configuration conf = new HdfsConfiguration(); - conf.set(VespaConfiguration.DRYRUN, "true"); - conf.set(VespaConfiguration.ENDPOINT, "dummy-endpoint"); + + private PigServer setup(String script, Configuration conf) throws Exception { + if (conf == null) { + conf = new HdfsConfiguration(); + } + conf.setIfUnset(VespaConfiguration.DRYRUN, "true"); + conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint"); // Parameter substitutions - can also be set by configuration Map<String, String> parameters = new HashMap<>(); @@ -63,8 +87,14 @@ public class VespaStorageTest { return ps; } + private void assertAllDocumentsOk(String script) throws Exception { - PigServer ps = setup(script); + assertAllDocumentsOk(script, null); + } + + + private void assertAllDocumentsOk(String script, Configuration conf) throws Exception { + PigServer ps = setup(script, conf); List<ExecJob> jobs = ps.executeBatch(); PigStats stats = jobs.get(0).getStatistics(); for (JobStats js : stats.getJobGraph()) { diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java index f6977155b05..0bf9f6b447e 100644 --- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java +++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java @@ -19,9 +19,11 @@ import java.util.Map; public class MockQueryHandler implements HttpHandler { private final Map<String, List<MockQueryHit>> hitMap; + private final String childNode; - public MockQueryHandler() { + public MockQueryHandler(String childNode) { this.hitMap = new HashMap<>(); + this.childNode = childNode; } public void handle(HttpExchange t) throws IOException { @@ -145,9 +147,19 @@ public class MockQueryHandler implements HttpHandler { g.writeFieldName("children"); g.writeStartArray(); + + if (!childNode.isEmpty()) { + g.writeStartObject(); + g.writeFieldName(childNode); + g.writeStartArray(); + } } private void writeResultsEnd(JsonGenerator g) throws IOException { + if (!childNode.isEmpty()) { + g.writeEndArray(); + g.writeEndObject(); + } g.writeEndArray(); g.writeEndObject(); g.writeEndObject(); diff --git a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java index 001879cbdf8..c98e7b1c02c 100644 --- a/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java +++ b/hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java @@ -33,4 +33,18 @@ public class TupleToolsTest { tuple.append(value); } + @Test + public void requireThatTupleToStringHandlesStringCharacters() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple); + addToTuple("rank", DataType.INTEGER, 1, schema, tuple); + + String template = "Id is <id> and rank is <rank>"; + String result = TupleTools.toString(schema, tuple, template); + + assertEquals("Id is _!@#$%^&*() and rank is 1", result); + } + } diff --git a/hadoop/src/test/pig/feed_multiline_operations.pig b/hadoop/src/test/pig/feed_multiline_operations.pig new file mode 100644 index 00000000000..e9efb36858b --- /dev/null +++ b/hadoop/src/test/pig/feed_multiline_operations.pig @@ -0,0 +1,14 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaJsonLoader +DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader(); + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for json data +metrics = LOAD 'src/test/resources/operations_multiline_data.json' USING VespaJsonLoader() AS (data:chararray); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage(); + diff --git a/hadoop/src/test/pig/feed_operations_with_json_loader.pig b/hadoop/src/test/pig/feed_operations_with_json_loader.pig new file mode 100644 index 00000000000..6d31201e4eb --- /dev/null +++ b/hadoop/src/test/pig/feed_operations_with_json_loader.pig @@ -0,0 +1,13 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaJsonLoader +DEFINE VespaJsonLoader com.yahoo.vespa.hadoop.pig.VespaSimpleJsonLoader(); + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for json data +metrics = LOAD 'src/test/resources/operations_data.json' USING VespaJsonLoader() AS (data:chararray); + +-- Store into Vespa +STORE metrics INTO '$ENDPOINT' USING VespaStorage();
\ No newline at end of file diff --git a/hadoop/src/test/pig/feed_operations_xml.pig b/hadoop/src/test/pig/feed_operations_xml.pig new file mode 100644 index 00000000000..d109d56ad1e --- /dev/null +++ b/hadoop/src/test/pig/feed_operations_xml.pig @@ -0,0 +1,10 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define short name for VespaStorage +DEFINE VespaStorage com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data - one column for xml data +data = LOAD 'src/test/resources/operations_data.xml' AS (data:chararray); + +-- Store into Vespa +STORE data INTO '$ENDPOINT' USING VespaStorage();
\ No newline at end of file diff --git a/hadoop/src/test/pig/query_alt_root.pig b/hadoop/src/test/pig/query_alt_root.pig new file mode 100644 index 00000000000..8995990e398 --- /dev/null +++ b/hadoop/src/test/pig/query_alt_root.pig @@ -0,0 +1,19 @@ +-- REGISTER vespa-hadoop.jar -- Not needed in tests + +-- Define Vespa query for retrieving blog posts +DEFINE BlogPostRecommendations + com.yahoo.vespa.hadoop.pig.VespaQuery( + 'query=$ENDPOINT/search?query=<userid>&hits=100', + 'rootnode=root/children/children', + 'schema=rank:int,id:chararray,relevance:double,fields/id:chararray,fields/content:chararray' + ); + +-- Load data from a local file +users = LOAD 'src/test/resources/user_ids.csv' AS (userid:chararray); +users = FILTER users BY userid IS NOT null; + +-- Run a set of queries against Vespa +recommendations = FOREACH users GENERATE userid, FLATTEN(BlogPostRecommendations(*)); + +-- Output recommendations +DUMP recommendations; diff --git a/hadoop/src/test/resources/operations_data.xml b/hadoop/src/test/resources/operations_data.xml new file mode 100644 index 00000000000..cdf1ca78c1d --- /dev/null +++ b/hadoop/src/test/resources/operations_data.xml @@ -0,0 +1,13 @@ +<?xml version="1.0" encoding="utf-8"?> +<vespafeed> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/a-ha/Scoundrel+Days"> <url>http://music.yahoo.com/a-ha/Scoundrel+Days</url> <title><![CDATA[Scoundrel Days]]></title> <artist><![CDATA[a-ha]]></artist> <year>0</year> <popularity>290</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Restless+And+Wild"> <url>http://music.yahoo.com/Accept/Restless+And+Wild</url> <title><![CDATA[Restless And Wild]]></title> <artist><![CDATA[Accept]]></artist> <year>0</year> <popularity>75</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Accept/Staying+A+Life"> <url>http://music.yahoo.com/Accept/Staying+A+Life</url> <title><![CDATA[Staying A Life]]></title> <artist><![CDATA[Accept]]></artist> <year>1985</year> <popularity>77</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Dirt"> <url>http://music.yahoo.com/Alice+In+Chains/Dirt</url> <title><![CDATA[Dirt]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1992</year> <popularity>114</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Alice+In+Chains/Live"> <url>http://music.yahoo.com/Alice+In+Chains/Live</url> <title><![CDATA[Live]]></title> <artist><![CDATA[Alice In Chains]]></artist> <year>1990</year> <popularity>363</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life"> <url>http://music.yahoo.com/Amy+MacDonald/This+Is+The+Life</url> <title><![CDATA[This Is The Life]]></title> <artist><![CDATA[Amy MacDonald]]></artist> <year>2007</year> <popularity>355</popularity> </document> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Ane+Brun/Duets"> <url>http://music.yahoo.com/Ane+Brun/Duets</url> <title><![CDATA[Duets]]></title> <artist><![CDATA[Ane Brun]]></artist> <year>0</year> <popularity>255</popularity> </document> + <update documenttype="music" documentid="id:music:music::http://music.yahoo.com/bobdylan/BestOf"><assign field="title">The Best of Bob Dylan</assign><add field="tracks"><item>Man Of Constant Sorrow</item></add></update> + <remove documentid="id:music:music::http://music.yahoo.com/Aqpop/Beautifully+Smart" /> + <document documenttype="music" documentid="id:music:music::http://music.yahoo.com/Annuals/Be+He+Me"> <url>http://music.yahoo.com/Annuals/Be+He+Me</url> <title><![CDATA[Be He Me]]></title> <artist><![CDATA[Annuals]]></artist> <year>0</year> <popularity>207</popularity> </document> +</vespafeed> diff --git a/hadoop/src/test/resources/operations_multiline_data.json b/hadoop/src/test/resources/operations_multiline_data.json new file mode 100644 index 00000000000..2b51698d9b7 --- /dev/null +++ b/hadoop/src/test/resources/operations_multiline_data.json @@ -0,0 +1,93 @@ +[ + { + "put": "id:testapp:metric::clicks-2015110414", + "fields": { + "date": "2015110414", + "name": "clicks", + "value": 1, + "application": "testapp" + } + }, + { + "fields": { + "date": "2015110416", + "name": "clicks", + "value": 5, + "application": "testapp" + }, + "put": "id:testapp:metric::clicks-2015110416" + }, + { + "put": "id:testapp:metric::clicks-2015110415", + "fields": { + "date": "2015110415", + "name": "clicks", + "value": 2, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110417", + "fields": { + "date": "2015110417", + "name": "clicks", + "value": 3, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110418", + "fields": { + "date": "2015110418", + "name": "clicks", + "value": 6, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110419", + "fields": { + "date": "2015110419", + "name": "clicks", + "value": 3, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110420", + "fields": { + "date": "2015110420", + "name": "clicks", + "value": 4, + "application": "testapp" + } + }, + { + "put": "id:testapp:metric::clicks-2015110421", + "fields": { + "date": "2015110421", + "name": "clicks", + "value": 2, + "application": "testapp" + } + }, + { + "fields": { + "date": "2015110422", + "name": "clicks", + "value": 5, + "application": "testapp" + }, + "condition": "metrics==0", + "put": "id:testapp:metric::clicks-2015110422" + }, + { + "put": "id:testapp:metric::clicks-2015110423", + "fields": { + "date": "2015110423", + "name": "clicks", + "value": 1, + "application": "testapp" + } + } +] |