summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop/src/test/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-hadoop/src/test/java/com')
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java197
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java274
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java96
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java110
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java218
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java50
6 files changed, 945 insertions, 0 deletions
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
new file mode 100644
index 00000000000..a79f63a77ce
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java
@@ -0,0 +1,197 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.test.PathUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MapReduceTest {
+
+ protected static File hdfsBaseDir;
+ protected static FileSystem hdfs;
+ protected static Configuration conf;
+ protected static MiniDFSCluster cluster;
+
+ protected static Path metricsJsonPath;
+ protected static Path metricsCsvPath;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath());
+
+ conf = new HdfsConfiguration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath());
+ conf.set(VespaConfiguration.DRYRUN, "true");
+ conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun");
+
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ hdfs = FileSystem.get(conf);
+
+ metricsJsonPath = new Path("metrics_json");
+ metricsCsvPath = new Path("metrics_csv");
+ copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data");
+ copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data");
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ Path testDir = new Path(hdfsBaseDir.getParent());
+ hdfs.delete(testDir, true);
+ cluster.shutdown();
+ LocalFileSystem localFileSystem = FileSystem.getLocal(conf);
+ localFileSystem.delete(testDir, true);
+ }
+
+ @Test
+ public void requireThatMapOnlyJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(FeedMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(job, metricsJsonPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+
+ @Test
+ public void requireThatMapReduceJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(FeedMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(FeedReducer.class);
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, metricsJsonPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+
+
+ @Test
+ public void requireThatTransformMapJobSucceeds() throws Exception {
+ Job job = Job.getInstance(conf);
+ job.setJarByClass(MapReduceTest.class);
+ job.setMapperClass(ParsingMapper.class);
+ job.setOutputFormatClass(VespaOutputFormat.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(FeedReducer.class);
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, metricsCsvPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Job Failed", success);
+
+ VespaCounters counters = VespaCounters.get(job);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ assertEquals(0, counters.getDocumentsSkipped());
+ }
+
+
+ private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException {
+ Path hdfsPath = new Path(hdfsDir, hdfsName);
+ FSDataOutputStream out = hdfs.create(hdfsPath);
+
+ try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
+ int len;
+ byte[] buffer = new byte[1024];
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> {
+ public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
+
+ public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
+ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String line = value.toString();
+ if (line == null || line.length() == 0)
+ return;
+
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ long date = Long.parseLong(tokenizer.nextToken());
+ String metricName = tokenizer.nextToken();
+ long metricValue = Long.parseLong(tokenizer.nextToken());
+ String application = tokenizer.nextToken();
+
+ String docid = "id:"+application+":metric::"+metricName+"-"+date;
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+
+ g.writeStartObject();
+ g.writeObjectFieldStart("fields");
+ g.writeNumberField("date", date);
+ g.writeStringField("name", metricName);
+ g.writeNumberField("value", metricValue);
+ g.writeStringField("application", application);
+ g.writeEndObject();
+ g.writeStringField("put", docid);
+ g.writeEndObject();
+ g.close();
+
+ context.write(key, new Text(out.toString()));
+ }
+ }
+
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
new file mode 100644
index 00000000000..0e1a8fba17c
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -0,0 +1,274 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+@SuppressWarnings("serial")
+public class VespaDocumentOperationTest {
+
+ @Test
+ public void requireThatUDFReturnsCorrectJson() throws Exception {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ // operation put is default
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("put").getTextValue());
+ assertEquals("testapp", fields.get("application").getTextValue());
+ assertEquals("clicks", fields.get("name").getTextValue());
+ assertEquals(3, fields.get("value").getIntValue());
+ }
+
+
+ @Test
+ public void requireThatUDFSupportsUpdateAssign() throws IOException {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").getTextValue());
+ assertEquals("testapp", fields.get("application").get("assign").getTextValue());
+ assertEquals("clicks", fields.get("name").get("assign").getTextValue());
+ assertEquals(3, fields.get("value").get("assign").getIntValue());
+ }
+
+
+ @Test
+ public void requireThatUDFReturnsNullForMissingConfig() throws Exception {
+ String json = getDocumentOperationJson();
+ assertNull(json);
+ }
+
+
+ @Test
+ public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception {
+ String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("remove").getTextValue());
+ assertNull(fields);
+ }
+
+
+ @Test
+ public void requireThatUDFGeneratesComplexDataTypes() throws Exception {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ Tuple intTuple = TupleFactory.getInstance().newTuple();
+ int[] intArray = {1, 2, 3};
+ for (int i : intArray) { intTuple.append(i); }
+
+ Tuple stringTuple = TupleFactory.getInstance().newTuple();
+ String[] stringArray = {"a", "b", "c"};
+ for (String s : stringArray) { stringTuple.append(s); }
+
+ DataBag bag = new SortedDataBag(null);
+ bag.add(intTuple);
+ bag.add(stringTuple);
+
+ Map<String, Object> innerMap = new HashMap<String, Object>() {{
+ put("a", "string");
+ put("tuple", intTuple);
+ }};
+
+ DataByteArray bytes = new DataByteArray("testdata".getBytes());
+
+ Map<String, Object> outerMap = new HashMap<String, Object>() {{
+ put("string", "value");
+ put("int", 3);
+ put("float", 3.145);
+ put("bool", true);
+ put("byte", bytes);
+ put("map", innerMap);
+ put("bag", bag);
+ }};
+
+ addToTuple("map", DataType.MAP, outerMap, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode map = fields.get("map");
+
+ assertEquals("value", map.get("string").getTextValue());
+ assertEquals(3, map.get("int").getIntValue());
+ assertEquals(3.145, map.get("float").getDoubleValue(), 1e-6);
+ assertEquals(true, map.get("bool").getBooleanValue());
+ assertEquals("dGVzdGRhdGE=", map.get("byte").getTextValue());
+
+ assertEquals("string", map.get("map").get("a").getTextValue());
+ for (int i = 0; i < intArray.length; ++i) {
+ assertEquals(intArray[i], map.get("map").get("tuple").get(i).asInt());
+ }
+
+ JsonNode bagField = map.get("bag");
+ for (int i = 0; i < intArray.length; ++i) {
+ assertEquals(intArray[i], bagField.get(0).get(i).asInt());
+ }
+ for (int i = 0; i < stringArray.length; ++i) {
+ assertEquals(stringArray[i], bagField.get(1).get(i).asText());
+ }
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysMustBeConfigured() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty"); // simple arrays not configured
+ // json: [["a"], ["b"], ["c"]]
+ assertEquals("a", array.get(0).get(0).asText());
+ assertEquals("b", array.get(1).get(0).asText());
+ assertEquals("c", array.get(2).get(0).asText());
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysAreSupported() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=array");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ @Test
+ public void requireThatSimpleArraysCanBeConfiguredWithWildcard() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=*");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ @Test
+ public void requireThatMultipleSimpleArraysAreSupported() throws Exception {
+ String[] stringArray = {"a", "b", "c"};
+ JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=empty,array");
+ // json: ["a", "b", "c"]
+ assertEquals("a", array.get(0).asText());
+ assertEquals("b", array.get(1).asText());
+ assertEquals("c", array.get(2).asText());
+ }
+
+
+ private JsonNode setupSimpleArrayOperation(String name, String[] array, String... params) throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ DataBag bag = new SortedDataBag(null);
+ for (String s : array) {
+ Tuple stringTuple = TupleFactory.getInstance().newTuple();
+ stringTuple.append(s);
+ bag.add(stringTuple);
+ }
+ addToTuple(name, DataType.BAG, bag, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation(params);
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ return fields.get(name);
+ }
+
+
+ @Test
+ public void requireThatUDFSupportsTensors() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ // Please refer to the tensor format documentation
+
+ Map<String, Double> tensor = new HashMap<String, Double>() {{
+ put("x:label1,y:label2,z:label4", 2.0);
+ put("x:label3", 3.0);
+ }};
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode tensorNode = fields.get("tensor");
+ JsonNode cells = tensorNode.get("cells");
+
+ assertEquals("label1", cells.get(0).get("address").get("x").asText());
+ assertEquals("label2", cells.get(0).get("address").get("y").asText());
+ assertEquals("label4", cells.get(0).get("address").get("z").asText());
+ assertEquals("label3", cells.get(1).get("address").get("x").asText());
+
+ assertEquals(2.0, cells.get(0).get("value").asDouble(), 1e-6);
+ assertEquals(3.0, cells.get(1).get("value").asDouble(), 1e-6);
+ }
+
+
+ @Test
+ public void requireThatUDFCanExcludeFields() throws IOException {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "exclude-fields=application,date");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ // 'application' and 'date' fields should not appear in JSON
+ assertNull(fields.get("application"));
+ assertNull(fields.get("date"));
+ assertNotNull(fields.get("name"));
+ assertNotNull(fields.get("value"));
+ }
+
+
+ private String getDocumentOperationJson(String... params) throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("application", DataType.CHARARRAY, "testapp", schema, tuple);
+ addToTuple("name", DataType.CHARARRAY, "clicks", schema, tuple);
+ addToTuple("date", DataType.CHARARRAY, "20160112", schema, tuple);
+ addToTuple("value", DataType.CHARARRAY, 3, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation(params);
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ return json;
+ }
+
+
+ private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
+ schema.add(new Schema.FieldSchema(alias, type));
+ tuple.append(value);
+ }
+
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
new file mode 100644
index 00000000000..0f123904cfb
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java
@@ -0,0 +1,96 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.sun.net.httpserver.HttpServer;
+import com.yahoo.vespa.hadoop.util.MockQueryHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class VespaQueryTest {
+
+ @Test
+ public void requireThatQueriesAreReturnedCorrectly() throws Exception {
+ runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901);
+ }
+
+ @Test
+ public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception {
+ runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902);
+ }
+
+ private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception {
+ final String endpoint = "http://localhost:" + port;
+
+ HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+ server.createContext("/", queryHandler);
+ server.start();
+
+ PigServer ps = setup(script, endpoint);
+
+ Iterator<Tuple> recommendations = ps.openIterator("recommendations");
+ while (recommendations.hasNext()) {
+ Tuple tuple = recommendations.next();
+
+ String userid = (String) tuple.get(0);
+ Integer rank = (Integer) tuple.get(1);
+ String docid = (String) tuple.get(2);
+ Double relevance = (Double) tuple.get(3);
+ String fieldId = (String) tuple.get(4);
+ String fieldContent = (String) tuple.get(5);
+
+ MockQueryHandler.MockQueryHit hit = queryHandler.getHit(userid, rank);
+ assertEquals(docid, hit.id);
+ assertEquals(relevance, hit.relevance, 1e-3);
+ assertEquals(fieldId, hit.fieldId);
+ assertEquals(fieldContent, hit.fieldContent);
+ }
+
+ if (server != null) {
+ server.stop(0);
+ }
+
+ }
+
+ private PigServer setup(String script, String endpoint) throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("ENDPOINT", endpoint);
+
+ PigServer ps = new PigServer(ExecType.LOCAL, conf);
+ ps.setBatchOn();
+ ps.registerScript(script, parameters);
+
+ return ps;
+ }
+
+ private MockQueryHandler createQueryHandler(String childNode) {
+ MockQueryHandler queryHandler = new MockQueryHandler(childNode);
+
+ List<String> userIds = Arrays.asList("5", "104", "313");
+
+ int hitsPerUser = 3;
+ for (int i = 0; i < hitsPerUser * userIds.size(); ++i) {
+ String id = "" + (i+1);
+ String userId = userIds.get(i / hitsPerUser);
+ queryHandler.newHit().
+ setId("id::::" + id).
+ setRelevance(1.0 - (i % hitsPerUser) * 0.1).
+ setFieldSddocname("doctype").
+ setFieldId("" + id).
+ setFieldDate("2016060" + id).
+ setFieldContent("Content for user " + userId + " hit " + i % hitsPerUser + "...").
+ add(userId);
+ }
+
+ return queryHandler;
+ }
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
new file mode 100644
index 00000000000..322c729b8c5
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -0,0 +1,110 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
+import org.junit.Test;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+
+
+public class VespaStorageTest {
+
+ @Test
+ public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(VespaConfiguration.DATA_FORMAT, "xml");
+ assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf);
+ }
+
+
+ @Test
+ public void requireThatPremadeOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig");
+ }
+
+
+ @Test
+ public void requireThatCreateOperationsFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_create_operations.pig");
+ }
+
+
+ @Test
+ public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig");
+ }
+
+
+ @Test
+ public void requireThatFeedVisitDataSucceeds() throws Exception {
+ assertAllDocumentsOk("src/test/pig/feed_visit_data.pig");
+ }
+
+
+ private PigServer setup(String script, Configuration conf) throws Exception {
+ if (conf == null) {
+ conf = new HdfsConfiguration();
+ }
+ conf.setIfUnset(VespaConfiguration.DRYRUN, "true");
+ conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint");
+
+ // Parameter substitutions - can also be set by configuration
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("ENDPOINT", "endpoint-does-not-matter-in-dryrun,another-endpoint-that-does-not-matter");
+
+ PigServer ps = new PigServer(ExecType.LOCAL, conf);
+ ps.setBatchOn();
+ ps.registerScript(script, parameters);
+
+ return ps;
+ }
+
+
+ private void assertAllDocumentsOk(String script) throws Exception {
+ assertAllDocumentsOk(script, null);
+ }
+
+
+ private void assertAllDocumentsOk(String script, Configuration conf) throws Exception {
+ PigServer ps = setup(script, conf);
+ List<ExecJob> jobs = ps.executeBatch();
+ PigStats stats = jobs.get(0).getStatistics();
+ for (JobStats js : stats.getJobGraph()) {
+ Counters hadoopCounters = ((MRJobStats)js).getHadoopCounters();
+ assertNotNull(hadoopCounters);
+ VespaCounters counters = VespaCounters.get(hadoopCounters);
+ assertEquals(10, counters.getDocumentsSent());
+ assertEquals(0, counters.getDocumentsFailed());
+ assertEquals(10, counters.getDocumentsOk());
+ }
+ }
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
new file mode 100644
index 00000000000..0bf9f6b447e
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java
@@ -0,0 +1,218 @@
+package com.yahoo.vespa.hadoop.util;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockQueryHandler implements HttpHandler {
+
+ private final Map<String, List<MockQueryHit>> hitMap;
+ private final String childNode;
+
+ public MockQueryHandler(String childNode) {
+ this.hitMap = new HashMap<>();
+ this.childNode = childNode;
+ }
+
+ public void handle(HttpExchange t) throws IOException {
+ URI uri = t.getRequestURI();
+ String query = uri.getQuery();
+ String response = null;
+
+ // Parse query - extract "query" element
+ if (query != null) {
+ String params[] = query.split("[&]");
+ for (String param : params) {
+ int i = param.indexOf('=');
+ String name = param.substring(0, i);
+ String value = URLDecoder.decode(param.substring(i + 1), "UTF-8");
+
+ if ("query".equalsIgnoreCase(name)) {
+ response = getResponse(URLDecoder.decode(param.substring(i + 1), "UTF-8"));
+ }
+ }
+ }
+
+ t.sendResponseHeaders(200, response == null ? 0 : response.length());
+ OutputStream os = t.getResponseBody();
+ os.write(response == null ? "".getBytes() : response.getBytes());
+ os.close();
+
+ }
+
+ public MockQueryHit getHit(String query, Integer rank) {
+ if (!hitMap.containsKey(query)) {
+ return null;
+ }
+ if (rank >= hitMap.get(query).size()) {
+ return null;
+ }
+ return hitMap.get(query).get(rank);
+ }
+
+ public MockQueryHit newHit() {
+ return new MockQueryHit(this);
+ }
+
+ public void addHit(String query, MockQueryHit hit) {
+ if (!hitMap.containsKey(query)) {
+ hitMap.put(query, new ArrayList<>());
+ }
+ hitMap.get(query).add(hit);
+ }
+
+ private String getResponse(String query) throws IOException {
+ List<MockQueryHit> hits = hitMap.get(query);
+ if (hits == null) {
+ return null;
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+
+ writeResultStart(g, hits.size());
+ for (MockQueryHit hit : hits) {
+ writeHit(g, hit);
+ }
+ writeResultsEnd(g);
+ g.close();
+
+ return out.toString();
+ }
+
+ private void writeHit(JsonGenerator g, MockQueryHit hit) throws IOException {
+ g.writeStartObject();
+
+ g.writeFieldName("id");
+ g.writeString(hit.id);
+
+ g.writeFieldName("relevance");
+ g.writeNumber(hit.relevance);
+
+ g.writeFieldName("fields");
+ g.writeStartObject();
+
+ g.writeFieldName("sddocname");
+ g.writeString(hit.fieldSddocname);
+
+ g.writeFieldName("date");
+ g.writeString(hit.fieldDate);
+
+ g.writeFieldName("content");
+ g.writeString(hit.fieldContent);
+
+ g.writeFieldName("id");
+ g.writeString(hit.fieldId);
+
+ g.writeEndObject();
+ g.writeEndObject();
+ }
+
+ private void writeResultStart(JsonGenerator g, int count) throws IOException {
+ g.writeStartObject();
+ g.writeFieldName("root");
+
+ g.writeStartObject();
+
+ g.writeFieldName("id");
+ g.writeString("toplevel");
+
+ g.writeFieldName("relevance");
+ g.writeNumber(1);
+
+ g.writeFieldName("fields");
+ g.writeStartObject();
+ g.writeFieldName("totalCount");
+ g.writeNumber(count);
+ g.writeEndObject();
+
+ g.writeFieldName("coverage");
+ g.writeStartObject();
+ g.writeFieldName("coverage");
+ g.writeNumber(100);
+ // ... more stuff here usually
+ g.writeEndObject();
+
+ g.writeFieldName("children");
+ g.writeStartArray();
+
+ if (!childNode.isEmpty()) {
+ g.writeStartObject();
+ g.writeFieldName(childNode);
+ g.writeStartArray();
+ }
+ }
+
+ private void writeResultsEnd(JsonGenerator g) throws IOException {
+ if (!childNode.isEmpty()) {
+ g.writeEndArray();
+ g.writeEndObject();
+ }
+ g.writeEndArray();
+ g.writeEndObject();
+ g.writeEndObject();
+ }
+
+ public static class MockQueryHit {
+
+ private final MockQueryHandler handler;
+
+ public String id;
+ public Double relevance;
+ public String fieldSddocname;
+ public String fieldDate;
+ public String fieldContent;
+ public String fieldId;
+
+ private MockQueryHit(MockQueryHandler handler) {
+ this.handler = handler;
+ }
+
+ public void add(String query) {
+ handler.addHit(query, this);
+ }
+
+ public MockQueryHit setId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public MockQueryHit setRelevance(Double relevance) {
+ this.relevance = relevance;
+ return this;
+ }
+
+ public MockQueryHit setFieldSddocname(String fieldSddocname) {
+ this.fieldSddocname = fieldSddocname;
+ return this;
+ }
+
+ public MockQueryHit setFieldDate(String fieldDate) {
+ this.fieldDate = fieldDate;
+ return this;
+ }
+
+ public MockQueryHit setFieldContent(String fieldContent) {
+ this.fieldContent = fieldContent;
+ return this;
+ }
+
+ public MockQueryHit setFieldId(String fieldId) {
+ this.fieldId = fieldId;
+ return this;
+ }
+ }
+
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
new file mode 100644
index 00000000000..c98e7b1c02c
--- /dev/null
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java
@@ -0,0 +1,50 @@
+package com.yahoo.vespa.hadoop.util;
+
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TupleToolsTest {
+
+ @Test
+ public void requireThatTupleToStringHandlesSimpleTypes() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
+ addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
+
+ String template = "Id is <id> and rank is <rank>";
+ String result = TupleTools.toString(schema, tuple, template);
+
+ assertEquals("Id is 123 and rank is 1", result);
+ }
+
+
+ private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
+ schema.add(new Schema.FieldSchema(alias, type));
+ tuple.append(value);
+ }
+
+ @Test
+ public void requireThatTupleToStringHandlesStringCharacters() throws IOException {
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+
+ addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple);
+ addToTuple("rank", DataType.INTEGER, 1, schema, tuple);
+
+ String template = "Id is <id> and rank is <rank>";
+ String result = TupleTools.toString(schema, tuple, template);
+
+ assertEquals("Id is _!@#$%^&*() and rank is 1", result);
+ }
+
+}