diff options
Diffstat (limited to 'vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java')
-rw-r--r-- | vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java new file mode 100644 index 00000000000..a79f63a77ce --- /dev/null +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/MapReduceTest.java @@ -0,0 +1,197 @@ +package com.yahoo.vespa.hadoop.pig; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.test.PathUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MapReduceTest { + + protected static File hdfsBaseDir; + protected static FileSystem hdfs; + protected static Configuration conf; + protected static MiniDFSCluster cluster; + + protected static Path metricsJsonPath; + protected static Path metricsCsvPath; + + @BeforeClass + public static void setUp() throws IOException { + hdfsBaseDir = new File(PathUtils.getTestDir(MapReduceTest.class).getCanonicalPath()); + + conf = new HdfsConfiguration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath()); + conf.set(VespaConfiguration.DRYRUN, "true"); + conf.set(VespaConfiguration.ENDPOINT, "endpoint-does-not-matter-in-dryrun"); + + cluster = new MiniDFSCluster.Builder(conf).build(); + hdfs = FileSystem.get(conf); + + metricsJsonPath = new Path("metrics_json"); + metricsCsvPath = new Path("metrics_csv"); + copyToHdfs("src/test/resources/operations_data.json", metricsJsonPath, "data"); + copyToHdfs("src/test/resources/tabular_data.csv", metricsCsvPath, "data"); + } + + @AfterClass + public static void tearDown() throws IOException { + Path testDir = new Path(hdfsBaseDir.getParent()); + hdfs.delete(testDir, true); + cluster.shutdown(); + LocalFileSystem localFileSystem = FileSystem.getLocal(conf); + localFileSystem.delete(testDir, true); + } + + @Test + public void requireThatMapOnlyJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(FeedMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + + FileInputFormat.setInputPaths(job, metricsJsonPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + + @Test + public void requireThatMapReduceJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(FeedMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(FeedReducer.class); + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, metricsJsonPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + } + + + @Test + public void requireThatTransformMapJobSucceeds() throws Exception { + Job job = Job.getInstance(conf); + job.setJarByClass(MapReduceTest.class); + job.setMapperClass(ParsingMapper.class); + job.setOutputFormatClass(VespaOutputFormat.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(FeedReducer.class); + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, metricsCsvPath); + + boolean success = job.waitForCompletion(true); + assertTrue("Job Failed", success); + + VespaCounters counters = VespaCounters.get(job); + assertEquals(10, counters.getDocumentsSent()); + assertEquals(0, counters.getDocumentsFailed()); + assertEquals(10, counters.getDocumentsOk()); + assertEquals(0, counters.getDocumentsSkipped()); + } + + + private static void copyToHdfs(String localFile, Path hdfsDir, String hdfsName) throws IOException { + Path hdfsPath = new Path(hdfsDir, hdfsName); + FSDataOutputStream out = hdfs.create(hdfsPath); + + try (InputStream in = new BufferedInputStream(new FileInputStream(localFile))) { + int len; + byte[] buffer = new byte[1024]; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + } finally { + out.close(); + } + } + + public static class FeedMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class FeedReducer extends Reducer<Object, Text, LongWritable, Text> { + public void reduce(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + context.write(key, value); + } + } + + public static class ParsingMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String line = value.toString(); + if (line == null || line.length() == 0) + return; + + StringTokenizer tokenizer = new StringTokenizer(line); + long date = Long.parseLong(tokenizer.nextToken()); + String metricName = tokenizer.nextToken(); + long metricValue = Long.parseLong(tokenizer.nextToken()); + String application = tokenizer.nextToken(); + + String docid = "id:"+application+":metric::"+metricName+"-"+date; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); + + g.writeStartObject(); + g.writeObjectFieldStart("fields"); + g.writeNumberField("date", date); + g.writeStringField("name", metricName); + g.writeNumberField("value", metricValue); + g.writeStringField("application", application); + g.writeEndObject(); + g.writeStringField("put", docid); + g.writeEndObject(); + g.close(); + + context.write(key, new Text(out.toString())); + } + } + + +} |