diff options
Diffstat (limited to 'vespa-hadoop/src/test/java')
6 files changed, 945 insertions, 0 deletions
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java new file mode 100644 index 00000000000..a79f63a77ce --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java @@ -0,0 +1,197 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.test.PathUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MapReduceTest { + + protected static File hdfsBaseDir; + protected static FileSystem hdfs; + protected static Configuration conf; + protected static MiniDFSCluster cluster; + + protected static Path metricsJsonPath; + protected static Path metricsCsvPath; + + @BeforeClass + public static void setUp() throws IOException { + hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath()); + + conf = new HdfsConfiguration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath()); + conf.set(VespaConfiguration.DRYRUN, "true"); + conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun"); + + cluster = new MiniDFSCluster.Builder(conf).build(); + hdfs = FileSystem.get(conf); + + metricsJsonPath = new Path("metrics_json"); + metricsCsvPath = new Path("metrics_csv"); + copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data"); + copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data"); + } + + @AfterClass + public static void tearDown() throws IOException { + Path testDir = new Path(hdfsBaseDir.getParent()); + hdfs.delete(testDir, true); + cluster.shutdown(); + LocalFileSystem localFileSystem = FileSystem.getLocal(conf); + localFileSystem.delete(testDir, true); + } + + @Test + public void requireThatMapOnlyJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(FeedMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + + FileInputFormat.setInputPaths(job, metricsJsonPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + + @Test + public void requireThatMapReduceJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(FeedMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(FeedReducer.class); + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, metricsJsonPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + + + @Test + public void requireThatTransformMapJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(ParsingMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(FeedReducer.class); + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, metricsCsvPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + assertEquals(0, counters.getDocumentsSkipped()); + } + + + private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException { + Path hdfsPath = new Path(hdfsDir, hdfsName); + FSDataOutputStream out = hdfs.create(hdfsPath); + + try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) { + int len; + byte[] buffer = new byte[1024]; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } finally { + out.close(); + } + } + + public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> { + public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String line = value.toString(); + if (line == null || line.length() == 0) + return; + + StringTokenizer tokenizer = new StringTokenizer(line); + long date = Long.parseLong(tokenizer.nextToken()); + String metricName = tokenizer.nextToken(); + long metricValue = Long.parseLong(tokenizer.nextToken()); + String application = tokenizer.nextToken(); + + String docid = "id:"+application+":metric::"+metricName+"-"+date; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); + + g.writeStartObject(); + g.writeObjectFieldStart("fields"); + g.writeNumberField("date", date); + g.writeStringField("name", metricName); + g.writeNumberField("value", metricValue); + g.writeStringField("application", application); + g.writeEndObject(); + g.writeStringField("put", docid); + g.writeEndObject(); + g.close(); + + context.write(key, new Text(out.toString())); + } + } + + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java new file mode 100644 index 00000000000..0e1a8fba17c --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -0,0 +1,274 @@ +package com.yahoo.vespa.hadoop.pig; + +import org.apache.pig.data.*; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +@SuppressWarnings("serial") +public class VespaDocumentOperationTest { + + @Test + public void requireThatUDFReturnsCorrectJson() throws Exception { + String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.path("fields"); + + // operation put is default + assertEquals("id:testapp:metrics::clicks-20160112", root.get("put").getTextValue()); + assertEquals("testapp", fields.get("application").getTextValue()); + assertEquals("clicks", fields.get("name").getTextValue()); + assertEquals(3, fields.get("value").getIntValue()); + } + + + @Test + public void requireThatUDFSupportsUpdateAssign() throws IOException { + String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.path("fields"); + + assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").getTextValue()); + assertEquals("testapp", fields.get("application").get("assign").getTextValue()); + assertEquals("clicks", fields.get("name").get("assign").getTextValue()); + assertEquals(3, fields.get("value").get("assign").getIntValue()); + } + + + @Test + public void requireThatUDFReturnsNullForMissingConfig() throws Exception { + String json = getDocumentOperationJson(); + assertNull(json); + } + + + @Test + public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception { + String json = getDocumentOperationJson("operation=remove", "docid=id:<application>:metrics::<name>-<date>"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + + assertEquals("id:testapp:metrics::clicks-20160112", root.get("remove").getTextValue()); + assertNull(fields); + } + + + @Test + public void requireThatUDFGeneratesComplexDataTypes() throws Exception { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + Tuple intTuple = TupleFactory.getInstance().newTuple(); + int[] intArray = {1, 2, 3}; + for (int i : intArray) { intTuple.append(i); } + + Tuple stringTuple = TupleFactory.getInstance().newTuple(); + String[] stringArray = {"a", "b", "c"}; + for (String s : stringArray) { stringTuple.append(s); } + + DataBag bag = new SortedDataBag(null); + bag.add(intTuple); + bag.add(stringTuple); + + Map<String, Object> innerMap = new HashMap<String, Object>() {{ + put("a", "string"); + put("tuple", intTuple); + }}; + + DataByteArray bytes = new DataByteArray("testdata".getBytes()); + + Map<String, Object> outerMap = new HashMap<String, Object>() {{ + put("string", "value"); + put("int", 3); + put("float", 3.145); + put("bool", true); + put("byte", bytes); + put("map", innerMap); + put("bag", bag); + }}; + + addToTuple("map", DataType.MAP, outerMap, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode map = fields.get("map"); + + assertEquals("value", map.get("string").getTextValue()); + assertEquals(3, map.get("int").getIntValue()); + assertEquals(3.145, map.get("float").getDoubleValue(), 1e-6); + assertEquals(true, map.get("bool").getBooleanValue()); + assertEquals("dGVzdGRhdGE=", map.get("byte").getTextValue()); + + assertEquals("string", map.get("map").get("a").getTextValue()); + for (int i = 0; i < intArray.length; ++i) { + assertEquals(intArray[i], map.get("map").get("tuple").get(i).asInt()); + } + + JsonNode bagField = map.get("bag"); + for (int i = 0; i < intArray.length; ++i) { + assertEquals(intArray[i], bagField.get(0).get(i).asInt()); + } + for (int i = 0; i < stringArray.length; ++i) { + assertEquals(stringArray[i], bagField.get(1).get(i).asText()); + } + } + + + @Test + public void requireThatSimpleArraysMustBeConfigured() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty"); // simple arrays not configured + // json: [["a"], ["b"], ["c"]] + assertEquals("a", array.get(0).get(0).asText()); + assertEquals("b", array.get(1).get(0).asText()); + assertEquals("c", array.get(2).get(0).asText()); + } + + + @Test + public void requireThatSimpleArraysAreSupported() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=array"); + // json: ["a", "b", "c"] + assertEquals("a", array.get(0).asText()); + assertEquals("b", array.get(1).asText()); + assertEquals("c", array.get(2).asText()); + } + + + @Test + public void requireThatSimpleArraysCanBeConfiguredWithWildcard() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=*"); + // json: ["a", "b", "c"] + assertEquals("a", array.get(0).asText()); + assertEquals("b", array.get(1).asText()); + assertEquals("c", array.get(2).asText()); + } + + + @Test + public void requireThatMultipleSimpleArraysAreSupported() throws Exception { + String[] stringArray = {"a", "b", "c"}; + JsonNode array = setupSimpleArrayOperation("array", stringArray, "docid=empty", "simple-array-fields=empty,array"); + // json: ["a", "b", "c"] + assertEquals("a", array.get(0).asText()); + assertEquals("b", array.get(1).asText()); + assertEquals("c", array.get(2).asText()); + } + + + private JsonNode setupSimpleArrayOperation(String name, String[] array, String... params) throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + DataBag bag = new SortedDataBag(null); + for (String s : array) { + Tuple stringTuple = TupleFactory.getInstance().newTuple(); + stringTuple.append(s); + bag.add(stringTuple); + } + addToTuple(name, DataType.BAG, bag, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation(params); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + return fields.get(name); + } + + + @Test + public void requireThatUDFSupportsTensors() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + // Please refer to the tensor format documentation + + Map<String, Double> tensor = new HashMap<String, Double>() {{ + put("x:label1,y:label2,z:label4", 2.0); + put("x:label3", 3.0); + }}; + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("tensor", DataType.MAP, tensor, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "create-tensor-fields=tensor"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode tensorNode = fields.get("tensor"); + JsonNode cells = tensorNode.get("cells"); + + assertEquals("label1", cells.get(0).get("address").get("x").asText()); + assertEquals("label2", cells.get(0).get("address").get("y").asText()); + assertEquals("label4", cells.get(0).get("address").get("z").asText()); + assertEquals("label3", cells.get(1).get("address").get("x").asText()); + + assertEquals(2.0, cells.get(0).get("value").asDouble(), 1e-6); + assertEquals(3.0, cells.get(1).get("value").asDouble(), 1e-6); + } + + + @Test + public void requireThatUDFCanExcludeFields() throws IOException { + String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "exclude-fields=application,date"); + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.path("fields"); + + // 'application' and 'date' fields should not appear in JSON + assertNull(fields.get("application")); + assertNull(fields.get("date")); + assertNotNull(fields.get("name")); + assertNotNull(fields.get("value")); + } + + + private String getDocumentOperationJson(String... params) throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + addToTuple("application", DataType.CHARARRAY, "testapp", schema, tuple); + addToTuple("name", DataType.CHARARRAY, "clicks", schema, tuple); + addToTuple("date", DataType.CHARARRAY, "20160112", schema, tuple); + addToTuple("value", DataType.CHARARRAY, 3, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation(params); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + return json; + } + + + private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) { + schema.add(new Schema.FieldSchema(alias, type)); + tuple.append(value); + } + + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java new file mode 100644 index 00000000000..0f123904cfb --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaQueryTest.java @@ -0,0 +1,96 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.sun.net.httpserver.HttpServer; +import com.yahoo.vespa.hadoop.util.MockQueryHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class VespaQueryTest { + + @Test + public void requireThatQueriesAreReturnedCorrectly() throws Exception { + runQueryTest("src/test/pig/query.pig", createQueryHandler(""), 18901); + } + + @Test + public void requireThatQueriesAreReturnedCorrectlyWithAlternativeJsonRoot() throws Exception { + runQueryTest("src/test/pig/query_alt_root.pig", createQueryHandler("children"), 18902); + } + + private void runQueryTest(String script, MockQueryHandler queryHandler, int port) throws Exception { + final String endpoint = "http://localhost:" + port; + + HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); + server.createContext("/", queryHandler); + server.start(); + + PigServer ps = setup(script, endpoint); + + Iterator<Tuple> recommendations = ps.openIterator("recommendations"); + while (recommendations.hasNext()) { + Tuple tuple = recommendations.next(); + + String userid = (String) tuple.get(0); + Integer rank = (Integer) tuple.get(1); + String docid = (String) tuple.get(2); + Double relevance = (Double) tuple.get(3); + String fieldId = (String) tuple.get(4); + String fieldContent = (String) tuple.get(5); + + MockQueryHandler.MockQueryHit hit = queryHandler.getHit(userid, rank); + assertEquals(docid, hit.id); + assertEquals(relevance, hit.relevance, 1e-3); + assertEquals(fieldId, hit.fieldId); + assertEquals(fieldContent, hit.fieldContent); + } + + if (server != null) { + server.stop(0); + } + + } + + private PigServer setup(String script, String endpoint) throws Exception { + Configuration conf = new HdfsConfiguration(); + Map<String, String> parameters = new HashMap<>(); + parameters.put("ENDPOINT", endpoint); + + PigServer ps = new PigServer(ExecType.LOCAL, conf); + ps.setBatchOn(); + ps.registerScript(script, parameters); + + return ps; + } + + private MockQueryHandler createQueryHandler(String childNode) { + MockQueryHandler queryHandler = new MockQueryHandler(childNode); + + List<String> userIds = Arrays.asList("5", "104", "313"); + + int hitsPerUser = 3; + for (int i = 0; i < hitsPerUser * userIds.size(); ++i) { + String id = "" + (i+1); + String userId = userIds.get(i / hitsPerUser); + queryHandler.newHit(). + setId("id::::" + id). + setRelevance(1.0 - (i % hitsPerUser) * 0.1). + setFieldSddocname("doctype"). + setFieldId("" + id). + setFieldDate("2016060" + id). + setFieldContent("Content for user " + userId + " hit " + i % hitsPerUser + "..."). + add(userId); + } + + return queryHandler; + } + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java new file mode 100644 index 00000000000..322c729b8c5 --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -0,0 +1,110 @@ +package com.yahoo.vespa.hadoop.pig; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.mapred.Counters; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; +import org.junit.Test; + +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; + + +public class VespaStorageTest { + + @Test + public void requireThatPremadeXmlOperationsFeedSucceeds() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.set(VespaConfiguration.DATA_FORMAT, "xml"); + assertAllDocumentsOk("src/test/pig/feed_operations_xml.pig", conf); + } + + + @Test + public void requireThatPremadeOperationsFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_operations.pig"); + } + + + @Test + public void requireThatPremadeMultilineOperationsFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_multiline_operations.pig"); + } + + + @Test + public void requireThatPremadeOperationsWithJsonLoaderFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_operations_with_json_loader.pig"); + } + + + @Test + public void requireThatCreateOperationsFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_create_operations.pig"); + } + + + @Test + public void requireThatCreateOperationsShortFormFeedSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_create_operations_short_form.pig"); + } + + + @Test + public void requireThatFeedVisitDataSucceeds() throws Exception { + assertAllDocumentsOk("src/test/pig/feed_visit_data.pig"); + } + + + private PigServer setup(String script, Configuration conf) throws Exception { + if (conf == null) { + conf = new HdfsConfiguration(); + } + conf.setIfUnset(VespaConfiguration.DRYRUN, "true"); + conf.setIfUnset(VespaConfiguration.ENDPOINT, "dummy-endpoint"); + + // Parameter substitutions - can also be set by configuration + Map<String, String> parameters = new HashMap<>(); + parameters.put("ENDPOINT", "endpoint-does-not-matter-in-dryrun,another-endpoint-that-does-not-matter"); + + PigServer ps = new PigServer(ExecType.LOCAL, conf); + ps.setBatchOn(); + ps.registerScript(script, parameters); + + return ps; + } + + + private void assertAllDocumentsOk(String script) throws Exception { + assertAllDocumentsOk(script, null); + } + + + private void assertAllDocumentsOk(String script, Configuration conf) throws Exception { + PigServer ps = setup(script, conf); + List<ExecJob> jobs = ps.executeBatch(); + PigStats stats = jobs.get(0).getStatistics(); + for (JobStats js : stats.getJobGraph()) { + Counters hadoopCounters = ((MRJobStats)js).getHadoopCounters(); + assertNotNull(hadoopCounters); + VespaCounters counters = VespaCounters.get(hadoopCounters); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + } + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java new file mode 100644 index 00000000000..0bf9f6b447e --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/MockQueryHandler.java @@ -0,0 +1,218 @@ +package com.yahoo.vespa.hadoop.util; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockQueryHandler implements HttpHandler { + + private final Map<String, List<MockQueryHit>> hitMap; + private final String childNode; + + public MockQueryHandler(String childNode) { + this.hitMap = new HashMap<>(); + this.childNode = childNode; + } + + public void handle(HttpExchange t) throws IOException { + URI uri = t.getRequestURI(); + String query = uri.getQuery(); + String response = null; + + // Parse query - extract "query" element + if (query != null) { + String params[] = query.split("[&]"); + for (String param : params) { + int i = param.indexOf('='); + String name = param.substring(0, i); + String value = URLDecoder.decode(param.substring(i + 1), "UTF-8"); + + if ("query".equalsIgnoreCase(name)) { + response = getResponse(URLDecoder.decode(param.substring(i + 1), "UTF-8")); + } + } + } + + t.sendResponseHeaders(200, response == null ? 0 : response.length()); + OutputStream os = t.getResponseBody(); + os.write(response == null ? "".getBytes() : response.getBytes()); + os.close(); + + } + + public MockQueryHit getHit(String query, Integer rank) { + if (!hitMap.containsKey(query)) { + return null; + } + if (rank >= hitMap.get(query).size()) { + return null; + } + return hitMap.get(query).get(rank); + } + + public MockQueryHit newHit() { + return new MockQueryHit(this); + } + + public void addHit(String query, MockQueryHit hit) { + if (!hitMap.containsKey(query)) { + hitMap.put(query, new ArrayList<>()); + } + hitMap.get(query).add(hit); + } + + private String getResponse(String query) throws IOException { + List<MockQueryHit> hits = hitMap.get(query); + if (hits == null) { + return null; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); + + writeResultStart(g, hits.size()); + for (MockQueryHit hit : hits) { + writeHit(g, hit); + } + writeResultsEnd(g); + g.close(); + + return out.toString(); + } + + private void writeHit(JsonGenerator g, MockQueryHit hit) throws IOException { + g.writeStartObject(); + + g.writeFieldName("id"); + g.writeString(hit.id); + + g.writeFieldName("relevance"); + g.writeNumber(hit.relevance); + + g.writeFieldName("fields"); + g.writeStartObject(); + + g.writeFieldName("sddocname"); + g.writeString(hit.fieldSddocname); + + g.writeFieldName("date"); + g.writeString(hit.fieldDate); + + g.writeFieldName("content"); + g.writeString(hit.fieldContent); + + g.writeFieldName("id"); + g.writeString(hit.fieldId); + + g.writeEndObject(); + g.writeEndObject(); + } + + private void writeResultStart(JsonGenerator g, int count) throws IOException { + g.writeStartObject(); + g.writeFieldName("root"); + + g.writeStartObject(); + + g.writeFieldName("id"); + g.writeString("toplevel"); + + g.writeFieldName("relevance"); + g.writeNumber(1); + + g.writeFieldName("fields"); + g.writeStartObject(); + g.writeFieldName("totalCount"); + g.writeNumber(count); + g.writeEndObject(); + + g.writeFieldName("coverage"); + g.writeStartObject(); + g.writeFieldName("coverage"); + g.writeNumber(100); + // ... more stuff here usually + g.writeEndObject(); + + g.writeFieldName("children"); + g.writeStartArray(); + + if (!childNode.isEmpty()) { + g.writeStartObject(); + g.writeFieldName(childNode); + g.writeStartArray(); + } + } + + private void writeResultsEnd(JsonGenerator g) throws IOException { + if (!childNode.isEmpty()) { + g.writeEndArray(); + g.writeEndObject(); + } + g.writeEndArray(); + g.writeEndObject(); + g.writeEndObject(); + } + + public static class MockQueryHit { + + private final MockQueryHandler handler; + + public String id; + public Double relevance; + public String fieldSddocname; + public String fieldDate; + public String fieldContent; + public String fieldId; + + private MockQueryHit(MockQueryHandler handler) { + this.handler = handler; + } + + public void add(String query) { + handler.addHit(query, this); + } + + public MockQueryHit setId(String id) { + this.id = id; + return this; + } + + public MockQueryHit setRelevance(Double relevance) { + this.relevance = relevance; + return this; + } + + public MockQueryHit setFieldSddocname(String fieldSddocname) { + this.fieldSddocname = fieldSddocname; + return this; + } + + public MockQueryHit setFieldDate(String fieldDate) { + this.fieldDate = fieldDate; + return this; + } + + public MockQueryHit setFieldContent(String fieldContent) { + this.fieldContent = fieldContent; + return this; + } + + public MockQueryHit setFieldId(String fieldId) { + this.fieldId = fieldId; + return this; + } + } + +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java new file mode 100644 index 00000000000..c98e7b1c02c --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/util/TupleToolsTest.java @@ -0,0 +1,50 @@ +package com.yahoo.vespa.hadoop.util; + +import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class TupleToolsTest { + + @Test + public void requireThatTupleToStringHandlesSimpleTypes() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("rank", DataType.INTEGER, 1, schema, tuple); + + String template = "Id is <id> and rank is <rank>"; + String result = TupleTools.toString(schema, tuple, template); + + assertEquals("Id is 123 and rank is 1", result); + } + + + private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) { + schema.add(new Schema.FieldSchema(alias, type)); + tuple.append(value); + } + + @Test + public void requireThatTupleToStringHandlesStringCharacters() throws IOException { + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + addToTuple("id", DataType.CHARARRAY, "_!@#$%^&*()", schema, tuple); + addToTuple("rank", DataType.INTEGER, 1, schema, tuple); + + String template = "Id is <id> and rank is <rank>"; + String result = TupleTools.toString(schema, tuple, template); + + assertEquals("Id is _!@#$%^&*() and rank is 1", result); + } + +} |