diff options
Diffstat (limited to 'sample-apps/blog-tutorial-shared')
21 files changed, 1113 insertions, 0 deletions
diff --git a/sample-apps/blog-tutorial-shared/README.md b/sample-apps/blog-tutorial-shared/README.md new file mode 100644 index 00000000000..09ac61e6b56 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/README.md @@ -0,0 +1,19 @@ +# Vespa tutorial utility scripts + +## From raw JSON to Vespa Feeding format + + $ python parse.py trainPosts.json > somefile.json + +Parses JSON from the file trainPosts.json downloaded from Kaggle during the [blog search tutorial](https://git.corp.yahoo.com/pages/vespa/documentation/documentation/tutorials/blog-search.html) and format it according to Vespa Document JSON format. + + $ python parse.py -p trainPosts.json > somefile.json + +Give it the flag "-p" or "--popularity", and the script also calculates and adds the field `popularity`, as introduced [in the tutorial](https://git.corp.yahoo.com/pages/vespa/documentation/documentation/tutorials/blog-search.html#blog-popularity-signal). + +## Building and running the Spark script for calculating latent factors + +1. Install the latest version of [Apache Spark](http://spark.apache.org/) and [sbt](http://www.scala-sbt.org/download.html). + +2. Clone this repository and build the Spark script with `sbt package` (in the root directory of this repo). + +3. Use the resulting jar file when running spark jobs included in the tutorials.
\ No newline at end of file diff --git a/sample-apps/blog-tutorial-shared/build.sbt b/sample-apps/blog-tutorial-shared/build.sbt new file mode 100644 index 00000000000..4c94a7d15e0 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/build.sbt @@ -0,0 +1,25 @@ +name := "blog-support" + +version := "0.1" + +scalaVersion := "2.11.8" +//scalaVersion := "2.10.5" + +// unit test +libraryDependencies += "org.scalatest" % "scalatest_2.11" % "2.2.6" % "test" +//libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0" % "test" + +// spark libraries +libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % "2.0.0", + "org.apache.spark" %% "spark-mllib" % "2.0.0", + "org.apache.spark" %% "spark-sql" % "2.0.0" +) + +//libraryDependencies ++= Seq( +// "org.apache.spark" %% "spark-core" % "1.6.2", +// "org.apache.spark" %% "spark-mllib" % "1.6.2", +// "org.apache.spark" %% "spark-sql" % "1.6.2" +//) + + diff --git a/sample-apps/blog-tutorial-shared/src/__init__.py b/sample-apps/blog-tutorial-shared/src/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/__init__.py diff --git a/sample-apps/blog-tutorial-shared/src/main/__init__.py b/sample-apps/blog-tutorial-shared/src/main/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/__init__.py diff --git a/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_compute_metric.pig b/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_compute_metric.pig new file mode 100644 index 00000000000..5df583e1f30 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_compute_metric.pig @@ -0,0 +1,42 @@ +REGISTER $VESPA_HADOOP_JAR + +DEFINE BlogPostRecommendations + com.yahoo.vespa.hadoop.pig.VespaQuery( + 'query=http://$ENDPOINT/search/?user_id=<user_id>&hits=$NUMBER_RECOMMENDATIONS', + 'schema=rank:int,id:chararray,relevance:double,fields/post_id:chararray' + ); + +-- Load test_set data from a local file +test_indices = LOAD '$TEST_INDICES' AS (post_id:chararray, user_id:chararray); +users = FOREACH test_indices GENERATE user_id; +users = FILTER users BY user_id IS NOT null; +users = DISTINCT users; + +-- Run a set of queries against Vespa +recommendations = FOREACH users GENERATE user_id, + FLATTEN(BlogPostRecommendations(*)) AS (rank, id, relevance, post_id); +recommendations = FOREACH recommendations GENERATE user_id, rank, post_id; +recommendations = FILTER recommendations BY rank IS NOT NULL AND post_id IS NOT NULL; + +-- join data +joined_data = JOIN test_indices BY (post_id, user_id), recommendations BY (post_id, user_id); +joined_data = FOREACH joined_data GENERATE + test_indices::post_id AS post_id, + test_indices::user_id AS user_id, + rank; + +-- transform and add a column +joined_data = FOREACH joined_data + GENERATE post_id, + user_id, + rank, + (double)rank/(double)$NUMBER_RECOMMENDATIONS AS percentile; + +grouped_data = GROUP joined_data BY user_id; +grouped_data = FOREACH grouped_data + GENERATE group AS user_id, + SUM(joined_data.percentile) AS sum_percentile, + COUNT(joined_data.post_id) AS number_read, + (double)SUM(joined_data.percentile)/(double)COUNT(joined_data.post_id) AS expected_percentile; + +STORE grouped_data INTO '$OUTPUT';
\ No newline at end of file diff --git a/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_feed_content_and_tensor_vespa.pig b/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_feed_content_and_tensor_vespa.pig new file mode 100644 index 00000000000..a7863a6f78d --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_feed_content_and_tensor_vespa.pig @@ -0,0 +1,134 @@ +REGISTER '$VESPA_HADOOP_JAR' + +-- Create valid Vespa put operations +DEFINE VespaPutOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:blog_post::<post_id>', + 'create-tensor-fields=user_item_cf', + 'simple-array-fields=tags,categories' + ); + +DEFINE VespaPutOperationUser + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:user::<user_id>', + 'create-tensor-fields=user_item_cf', + 'simple-array-fields=has_read_items' + ); + +-- Transform tabular data to a Vespa document operation JSON format +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data +data = LOAD '$DATA_PATH' USING + JsonLoader('date_gmt:chararray, + language:chararray, + author:chararray, + url:chararray, + title:chararray, + blog:chararray, + post_id:chararray, + tags:{T:(tag_name:chararray)}, + blogname:chararray, + date:chararray, + content:chararray, + categories:{T:(category_name:chararray)}, + likes:{T:(dt:chararray, uid:chararray)}'); + +data_for_feed = FOREACH data GENERATE + date_gmt, + language, + author, + url, + title, + blog, + post_id, + tags, + blogname, + content, + categories; + +-- Feed only blog posts that belong to test set +test_indices = LOAD '$TEST_INDICES' AS (post_id, user_id); +test_indices = FOREACH test_indices GENERATE post_id; +test_indices = DISTINCT test_indices; + +test_data_for_feed = FOREACH (JOIN data_for_feed BY post_id, test_indices BY post_id) + GENERATE date_gmt AS date_gmt, + language AS language, + author AS author, + url AS url, + title AS title, + blog AS blog, + data_for_feed::post_id AS post_id, + tags AS tags, + blogname AS blogname, + content AS content, + categories AS categories; + +-- Load Blog post CF latent factors +data_doc = LOAD '$BLOG_POST_FACTORS' USING + JsonLoader('post_id:chararray, + user_item_cf:[double]'); + +-- Join data and latent factors +data_content_and_doc_tensor = JOIN test_data_for_feed BY post_id LEFT, data_doc BY post_id; +data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE + date_gmt AS date_gmt, + language AS language, + author AS author, + url AS url, + title AS title, + blog AS blog, + test_data_for_feed::post_id as post_id, + tags AS tags, + blogname AS blogname, + content AS content, + categories AS categories, + user_item_cf AS user_item_cf, + (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf; + +-- Generate valid Vespa JSON format +data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*); + +-- Load User CF latent factors +data_user = LOAD '$USER_FACTORS' USING + JsonLoader('user_id:chararray, + user_item_cf:[double]'); +data_user = FOREACH data_user GENERATE + user_id AS user_id, + user_item_cf AS user_item_cf; + +-- Articles already liked +data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid); + +post_liked_per_user = GROUP data_likes BY uid; +post_liked_per_user = FOREACH post_liked_per_user GENERATE + group AS user_id, + data_likes.post_id AS has_read_items; + +-- Join user data +data_user = JOIN post_liked_per_user BY user_id FULL, + data_user BY user_id; + +data_user = FOREACH data_user GENERATE + (post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id, + user_item_cf AS user_item_cf, + (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf, + has_read_items AS has_read_items; + +data_user = FILTER data_user BY user_id IS NOT NULL; + +-- Generate valid Vespa JSON format +data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*); + +joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed; + +-- Store into Vespa +STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage(); + + + + diff --git a/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_feed_content_vespa.pig b/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_feed_content_vespa.pig new file mode 100644 index 00000000000..5573e9fcbc7 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/pig/tutorial_feed_content_vespa.pig @@ -0,0 +1,52 @@ +REGISTER '$VESPA_HADOOP_JAR' +-- REGISTER /Users/tmartins/projects/yahoo/sw/vespa-hadoop/target/vespa-hadoop.jar +-- REGISTER '/homes/tmartins/yinst_root/lib/jars/vespa-hadoop.jar' + +-- UDF to create valid Vespa document operation in JSON format +DEFINE VespaPutOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-search:blog_post::<post_id>', + 'simple-array-fields=tags,categories' + ); + +-- UDF to send data to a Vespa endpoint +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data from any source - here we load using JsonLoader +data = LOAD '$DATA_PATH' USING + JsonLoader('date_gmt:chararray, + language:chararray, + author:chararray, + url:chararray, + title:chararray, + blog:chararray, + post_id:chararray, + tags:{T:(tag_name:chararray)}, + blogname:chararray, + date:chararray, + content:chararray, + categories:{T:(category_name:chararray)}, + likes:{T:(dt:chararray, uid:chararray)}'); + +-- Select fields that will be sent to Vespa. +-- This should follow blog_post.sd +data_for_feed = FOREACH data GENERATE + date_gmt, + language, + author, + url, + title, + blog, + post_id, + tags, + blogname, + content, + categories; + +-- Create valid Vespa put operations in JSON format +data_for_feed_json = FOREACH data_for_feed GENERATE VespaPutOperationDoc(*); + +-- Store into Vespa +STORE data_for_feed_json INTO '$ENDPOINT' USING VespaStorage(); diff --git a/sample-apps/blog-tutorial-shared/src/main/python/__init__.py b/sample-apps/blog-tutorial-shared/src/main/python/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/python/__init__.py diff --git a/sample-apps/blog-tutorial-shared/src/main/python/parse.py b/sample-apps/blog-tutorial-shared/src/main/python/parse.py new file mode 100644 index 00000000000..0d5f892eebc --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/python/parse.py @@ -0,0 +1,72 @@ +import json +import argparse + + +class KaggleRawDataParser: + + popularity = False + raw_data_file = None + total_number_of_likes = 0 + likes_per_blog = {} + + def __init__(self): + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--popularity", action="store_true", help="add 'popularity' field") + parser.add_argument("file", help="location of file to be parsed") + args = parser.parse_args() + + self.popularity = args.popularity + self.raw_data_file = args.file + + def main(self): + if self.popularity: + self.calculate_popularity() + self.parse() + + def calculate_popularity(self): + unparsed_file = open(self.raw_data_file, "r") + + for line in unparsed_file: + data = json.loads(line) + + self.total_number_of_likes += len(data["likes"]) + if data["blog"] in self.likes_per_blog: + self.likes_per_blog[data["blog"]] += len(data["likes"]) + else: + self.likes_per_blog[data["blog"]] = len(data["likes"]) + + unparsed_file.close() + + def parse(self): + unparsed_file = open(self.raw_data_file, "r") + + for line in unparsed_file: + data = json.loads(line) + + parsed_data = { + "put": "id:blog-search:blog_post::" + data["post_id"], + "fields": { + "blogname": data["blogname"], + "post_id": data["post_id"], + "author": data["author"], + "language": data["language"], + "categories": data["categories"], + "title": data["title"], + "blog": data["blog"], + "date_gmt": data["date_gmt"], + "url": data["url"], + "content": data["content"], + "tags": data["tags"], + "date": int(data["date_gmt"][0:4] + data["date_gmt"][5:7] + data["date_gmt"][8:10]) + } + } + if self.popularity: + parsed_data["fields"]["popularity"] = \ + float(self.likes_per_blog[data["blog"]]) / float(self.total_number_of_likes) + + print(json.dumps(parsed_data)) + + unparsed_file.close() + +if __name__ == '__main__': + KaggleRawDataParser().main() diff --git a/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/BlogRecommendationApp.scala b/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/BlogRecommendationApp.scala new file mode 100644 index 00000000000..83d56718823 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/BlogRecommendationApp.scala @@ -0,0 +1,202 @@ +package com.yahoo.example.blog + +import org.apache.spark.sql.SparkSession + +object BlogRecommendationApp { + val usage = """ + Usage: spark-submit \ + | --class "BlogRecommendationApp" \ + | --master local[4] \ + | JAR_FILE + | --task task_command [TASK RELATED OPTIONS] + + spark-submit \ + | --class "BlogRecommendationApp" \ + | --master local[4] \ + | JAR_FILE + | --task collaborative_filtering + | --input_file path + | --rank value + | --numIterations value + | --lambda value + | --output_path path + + spark-submit \ + | --class "BlogRecommendationApp" \ + | --master local[4] \ + | JAR_FILE + | --task collaborative_filtering_cv + | --input_file path + | --numIterations value + | --output_path path + | + + spark-submit \ + | --class "BlogRecommendationApp" \ + | --master local[4] \ + | JAR_FILE + | --task split_set + | --input_file path + | --test_perc_stage1 value + | --test_perc_stage2 value + | --seed value + | --output_path path + """ + + private val COLLABORATIVE_FILTERING = "collaborative_filtering" + private val COLLABORATIVE_FILTERING_CV = "collaborative_filtering_cv" + private val SPLIT_SET_INTO_TRAIN_AND_TEST = "split_set" + + type OptionMap = Map[Symbol, Any] + + def main(args: Array[String]) { + + val options = parseCommandLineOptions(args) + val task_name = options('task).toString + + task_name match { + case COLLABORATIVE_FILTERING => CollaborativeFilteringExample(options) + case COLLABORATIVE_FILTERING_CV => CollaborativeFilteringCV(options) + case SPLIT_SET_INTO_TRAIN_AND_TEST => SplitSetIntoTrainingAndTestSets(options) + } + + } + + private def SplitSetIntoTrainingAndTestSets(options: OptionMap) = { + + val spark = SparkSession + .builder() + .appName("Split Full Data Into Train and Test Sets") + .getOrCreate() + + val splitter = new SplitFullSetIntoTrainAndTestSets(spark) + + val sets = splitter.run(input_file_path = options('input_file).toString, + test_perc_stage1 = options('test_perc_stage1).toString.toDouble, + test_perc_stage2 = options('test_perc_stage2).toString.toDouble, + seed = options('seed).toString.toInt) + + SplitFullSetIntoTrainAndTestSets.writeTrainAndTestSetsIndices(sets, options('output_path).toString) + + } + + private def CollaborativeFilteringExample(options: OptionMap) = { + + // TODO: Check if output_path already exist + + val spark = SparkSession + .builder() + .appName("Collaborative Filtering") + .getOrCreate() + + val cf = new CollaborativeFiltering(spark) + + val model = cf.run( + input_path = options('input_file).toString, + rank = options('rank).toString.toInt, + numIterations = options('num_iterations).toString.toInt, + lambda = options('lambda).toString.toDouble) + + CollaborativeFiltering.writeFeaturesAsVespaTensorText(model, options('output_path).toString) + + } + + private def CollaborativeFilteringCV(options: OptionMap) = { + + // TODO: Check if output_path already exist + + val spark = SparkSession + .builder() + .appName("Collaborative Filtering CV") + .getOrCreate() + + val cf = new CollaborativeFiltering(spark) + + val model = cf.run_pipeline( + input_path = options('input_file).toString, + numIterations = options('num_iterations).toString.toInt) + + CollaborativeFiltering.writeFeaturesAsVespaTensorText(model, options('output_path).toString) + + } + + private def parseCommandLineOptions(args: Array[String]): OptionMap = { + + def findTask(list: List[String]) : String = { + list match { + case Nil => println("Please, define a valid task" + "\n" + usage) + sys.exit(1) + case "--task" :: value :: tail => + value + case option :: tail => findTask(tail) + } + } + + def ParseCollaborativeFilteringOptions(map : OptionMap, list: List[String]) : OptionMap = { + list match { + case Nil => map + case "--input_file" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('input_file -> value.toString), tail) + case "--rank" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('rank -> value.toInt), tail) + case "--numIterations" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('num_iterations -> value.toInt), tail) + case "--lambda" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('lambda -> value.toDouble), tail) + case "--output_path" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('output_path -> value.toString), tail) + case option :: tail => + ParseCollaborativeFilteringOptions(map, tail) + } + } + + def ParseCollaborativeFilteringCVOptions(map : OptionMap, list: List[String]) : OptionMap = { + list match { + case Nil => map + case "--input_file" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('input_file -> value.toString), tail) + case "--numIterations" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('num_iterations -> value.toInt), tail) + case "--output_path" :: value :: tail => + ParseCollaborativeFilteringOptions(map ++ Map('output_path -> value.toString), tail) + case option :: tail => + ParseCollaborativeFilteringOptions(map, tail) + } + } + + def ParseSplitSetOptions(map : OptionMap, list: List[String]) : OptionMap = { + list match { + case Nil => map + case "--input_file" :: value :: tail => + ParseSplitSetOptions(map ++ Map('input_file -> value.toString), tail) + case "--test_perc_stage1" :: value :: tail => + ParseSplitSetOptions(map ++ Map('test_perc_stage1 -> value.toDouble), tail) + case "--test_perc_stage2" :: value :: tail => + ParseSplitSetOptions(map ++ Map('test_perc_stage2 -> value.toDouble), tail) + case "--seed" :: value :: tail => + ParseSplitSetOptions(map ++ Map('seed -> value.toInt), tail) + case "--output_path" :: value :: tail => + ParseSplitSetOptions(map ++ Map('output_path -> value.toString), tail) + case option :: tail => + ParseSplitSetOptions(map , tail) + } + } + + if (args.length == 0) println(usage) + val arglist = args.toList + + val task_name = findTask(arglist) + + val options = task_name match { + case COLLABORATIVE_FILTERING => ParseCollaborativeFilteringOptions(Map('task -> task_name), arglist) + case COLLABORATIVE_FILTERING_CV => ParseCollaborativeFilteringCVOptions(Map('task -> task_name), arglist) + case SPLIT_SET_INTO_TRAIN_AND_TEST => ParseSplitSetOptions(Map('task -> task_name), arglist) + } + + options + + } + +} + + diff --git a/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/CollaborativeFiltering.scala b/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/CollaborativeFiltering.scala new file mode 100644 index 00000000000..f20e23321a1 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/CollaborativeFiltering.scala @@ -0,0 +1,159 @@ +package com.yahoo.example.blog + +import org.apache.spark.ml.recommendation.{ALS, ALSModel} +import org.apache.spark.ml.evaluation.RegressionEvaluator +import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} +import org.apache.spark.mllib.recommendation.Rating +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{col, explode} + +import scala.collection.mutable +import scala.util.parsing.json.JSONObject + +class CollaborativeFiltering(val ss: SparkSession) { + + import ss.implicits._ + + def loadTrainingIndicesIntoDataFrame(input_path: String) = { + + val ratings = ss.sparkContext.textFile(input_path) + .map(_.split("\t")) + .map(p => (p(0), p(1), 1)) + .toDF("post_id", "user_id", "label") + .filter(col("post_id").notEqual("null")) + .filter(col("user_id").notEqual("null")) + .select(col("post_id").cast(IntegerType).as("post_id"), + col("user_id").cast(IntegerType).as("user_id"), + col("label").cast(IntegerType).as("label")) + + ratings + + } + + def loadDataIntoDataFrame(input_path: String): DataFrame = { + + val dataset = ss.read.json(input_path) + + val setOne = udf(() => 1) + + val ratings = dataset.select(col("post_id").cast(IntegerType).as("post_id"), + explode(col("likes")).as("likes_flat")) + .select(col("post_id"), col("likes_flat.uid").cast(IntegerType).as("user_id")) + .withColumn("label", setOne()) + + ratings + + } + + def loadDataIntoRating(input_path: String): RDD[Rating] = { + + val dataset: DataFrame = ss.read.json(input_path) + + val ratings = dataset.select(col("post_id"), explode(col("likes")).as("likes_flat")) + .select(col("post_id"), col("likes_flat.uid").as("user_id")) + .rdd.map { + case Row(post_id: String, + user_id: String) => + Rating(user_id.toInt, post_id.toInt, 1) + } + + ratings + + } + + def run(input_path: String, rank: Int, numIterations: Int, lambda: Double): ALSModel = { + + // Loading and preparing the data + val ratings = loadTrainingIndicesIntoDataFrame(input_path) + + // Fitting the model + val model = new ALS() + .setItemCol("post_id") + .setRatingCol("label") + .setUserCol("user_id") + .setImplicitPrefs(true) + .setAlpha(lambda) + .setMaxIter(numIterations) + .setRank(rank) + .fit(ratings) + + model + + } + + def run_pipeline(input_path: String, numIterations: Int): ALSModel = { + + // Loading and preparing the data + val ratings = loadTrainingIndicesIntoDataFrame(input_path) + + // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. + val collaborative_filtering = new ALS() + .setItemCol("post_id") + .setRatingCol("label") + .setUserCol("user_id") + .setMaxIter(numIterations) + + val paramGrid = new ParamGridBuilder() + .addGrid(collaborative_filtering.rank, Array(10, 50, 100)) + .addGrid(collaborative_filtering.alpha, Array(0.001, 0.01, 0.1)) + .build() + + val cv = new CrossValidator() + .setEstimator(collaborative_filtering) + .setEvaluator(new RegressionEvaluator) + .setEstimatorParamMaps(paramGrid) + .setNumFolds(2) // Use 3+ in practice + + // Run cross-validation, and choose the best set of parameters. + val cvModel = cv.fit(ratings) + + cvModel.bestModel.asInstanceOf[ALSModel] + + } + +} + +object CollaborativeFiltering { + + def writeModelFeaturesAsTensor[T] (modelFeatures:(Int, mutable.WrappedArray[T]), id_string:String) = { + + val id = modelFeatures._1 + val latentVector = modelFeatures._2 + var latentVectorMap:Map[String,T] = Map() + var output:Map[String,Any] = Map() + + for ( i <- latentVector.indices ){ + + latentVectorMap += (("user_item_cf:" + i.toString, latentVector(i))) + + } + + output += ((id_string, id)) + output += (("user_item_cf", JSONObject(latentVectorMap))) + + JSONObject(output) + + } + + def writeFeaturesAsVespaTensorText(model: ALSModel, output_path: String): Unit ={ + + model + .itemFactors.rdd + .map { + case Row(id: Int, features: mutable.WrappedArray[Double]) => writeModelFeaturesAsTensor((id, features), "post_id") + } + .saveAsTextFile(output_path + "/product_features") + model + .userFactors.rdd + .map { + case Row(id: Int, features: mutable.WrappedArray[Double]) => writeModelFeaturesAsTensor((id, features), "user_id") + } + .saveAsTextFile(output_path + "/user_features") + + } + +} diff --git a/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/SplitFullSetIntoTrainAndTestSets.scala b/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/SplitFullSetIntoTrainAndTestSets.scala new file mode 100644 index 00000000000..41b9b2b2dd4 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/main/scala/com/yahoo/example/blog/SplitFullSetIntoTrainAndTestSets.scala @@ -0,0 +1,80 @@ +package com.yahoo.example.blog + +import org.apache.spark.sql.{SparkSession, DataFrame} +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.functions._ + +class SplitFullSetIntoTrainAndTestSets(val ss: SparkSession) { + + private def loadAndSimplifyFullDataset(input_file_path: String): DataFrame = { + + // Load full dataset + val full_dataset = ss.read.json(input_file_path) + val full_dataset_simple = full_dataset.select(col("post_id"), size(col("likes")).as("number_likes"), col("likes")) + + full_dataset_simple + + } + + private def splitSimplifiedDatasetIntoTrainAndTestSets(full_dataset_simple: DataFrame, + test_perc_stage1: Double, + test_perc_stage2: Double, + seed: Int): Array[DataFrame] = { + + // Set some blog posts aside to be present only on the test set + var sets = full_dataset_simple.randomSplit(Array(1 - test_perc_stage1, test_perc_stage1), seed) + + val training_set = sets(0) + val training_set_null = training_set.filter("number_likes = 0") + var training_set_exploded = training_set.select(col("post_id"), explode(col("likes")).as("likes_flat")) + training_set_exploded = training_set_exploded.select("post_id", "likes_flat.uid") + + val test_set = sets(1) + val test_set_null = test_set.filter("number_likes = 0") + var test_set_exploded = test_set.select(col("post_id"), explode(col("likes")).as("likes_flat")) + test_set_exploded = test_set_exploded.select("post_id", "likes_flat.uid") + + // randomly move some (post_id, uid) from training set to test set + sets = training_set_exploded.randomSplit(Array(1 - test_perc_stage2, test_perc_stage2), seed) + + training_set_exploded = sets(0) + + val additional_test_set_exploded = sets(1) + test_set_exploded = test_set_exploded.union(additional_test_set_exploded) + + // concatenate exploded set with null set + val getNull = udf(() => None: Option[String]) + training_set_exploded = training_set_exploded.union(training_set_null.select("post_id").withColumn("uid", getNull())) + test_set_exploded = test_set_exploded.union(test_set_null.select("post_id").withColumn("uid", getNull())) + + Array(training_set_exploded, test_set_exploded) + + } + + def run(input_file_path: String, test_perc_stage1: Double, test_perc_stage2:Double, seed: Int): Array[DataFrame] = { + + val full_dataset_simple = loadAndSimplifyFullDataset(input_file_path) + + splitSimplifiedDatasetIntoTrainAndTestSets(full_dataset_simple, + test_perc_stage1, + test_perc_stage2, + seed) + + } + +} + +object SplitFullSetIntoTrainAndTestSets { + + def writeTrainAndTestSetsIndices(train_and_test_sets: Array[DataFrame], output_path: String): Unit = { + + val training_set_exploded = train_and_test_sets(0) + val test_set_exploded = train_and_test_sets(1) + + // Write to disk + training_set_exploded.rdd.map(x => x(0) + "\t" + x(1)).saveAsTextFile(output_path + "/training_set_ids") + test_set_exploded.rdd.map(x => x(0) + "\t" + x(1)).saveAsTextFile(output_path + "/testing_set_ids") + + } + +} diff --git a/sample-apps/blog-tutorial-shared/src/test/__init__.py b/sample-apps/blog-tutorial-shared/src/test/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/__init__.py diff --git a/sample-apps/blog-tutorial-shared/src/test/python/__init__.py b/sample-apps/blog-tutorial-shared/src/test/python/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/python/__init__.py diff --git a/sample-apps/blog-tutorial-shared/src/test/python/parse-unittest.py b/sample-apps/blog-tutorial-shared/src/test/python/parse-unittest.py new file mode 100644 index 00000000000..9579f619aed --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/python/parse-unittest.py @@ -0,0 +1,179 @@ +import os +import sys +import unittest +import json +from StringIO import StringIO + +import src.main.python.parse as parse + +class KaggleRawDataParserTest(unittest.TestCase): + + raw_test_file = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + "/resources/trainPostsSampleWith3Elements.json" + saved_stdout = sys.stdout + out = StringIO() + + + def setUp(self): + sys.argv.append(self.raw_test_file) + + self.out = StringIO() + sys.stdout = self.out + + def tearDown(self): + sys.argv = [sys.argv[0]] + + sys.stdout = self.saved_stdout + + + def test_no_flags(self): + parser = parse.KaggleRawDataParser() + + self.assertFalse(parser.popularity) + self.assertEqual(parser.raw_data_file, self.raw_test_file) + + def test_popularity_flag(self): + sys.argv.append("-p") + parser = parse.KaggleRawDataParser() + + self.assertTrue(parser.popularity) + + def test_parsing_without_popularity(self): + parser = parse.KaggleRawDataParser() + + parser.parse() + + output_array = self.out.getvalue().strip().split('\n') + compare_with = [{ + "fields": { + "author": "5", + "blog": "4", + "blogname": "Matt on Not-WordPress", + "categories": [ + "Moblog" + ], + "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\" alt=\"\" title=\"photo19\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3838\" /></a>", + "date": 20120328, + "date_gmt": "2012-03-28 03:36:57", + "language": "en", + "post_id": "507823", + "tags": [], + "title": "#vipworkshop dinner", + "url": "http://matt.wordpress.com/?p=3837" + }, + "put": "id:blog-search:blog_post::507823" + }, + { + "fields": { + "author": "5", + "blog": "4", + "blogname": "Matt on Not-WordPress", + "categories": [ + "Moblog" + ], + "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\" alt=\"\" title=\"photo20\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3840\" /></a>", + "date": 20120328, + "date_gmt": "2012-03-28 04:41:37", + "language": "en", + "post_id": "1406963", + "tags": [], + "title": "Oven roasted tomatoes", + "url": "http://matt.wordpress.com/?p=3839" + }, + "put": "id:blog-search:blog_post::1406963" + }, + { + "fields": { + "author": "5", + "blog": "4", + "blogname": "Matt on Not-WordPress", + "categories": [ + "Moblog" + ], + "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\" alt=\"\" title=\"photo21\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3842\" /></a>", + "date": 20120328, + "date_gmt": "2012-03-28 19:59:45", + "language": "en", + "post_id": "1329369", + "tags": [], + "title": "Fish tacos and spicy slaw", + "url": "http://matt.wordpress.com/?p=3841" + }, + "put": "id:blog-search:blog_post::1329369" + }] + + for i in range(0, 3): + self.assertEqual(json.loads(output_array[i]), compare_with[i]) + + def test_parsing_with_popularity(self): + sys.argv.append("-p") + parser = parse.KaggleRawDataParser() + + parser.main() + + output_array = self.out.getvalue().strip().split('\n') + compare_with = [{ + "fields": { + "author": "5", + "blog": "4", + "blogname": "Matt on Not-WordPress", + "categories": [ + "Moblog" + ], + "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\" alt=\"\" title=\"photo19\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3838\" /></a>", + "date": 20120328, + "date_gmt": "2012-03-28 03:36:57", + "language": "en", + "popularity": 1.0, + "post_id": "507823", + "tags": [], + "title": "#vipworkshop dinner", + "url": "http://matt.wordpress.com/?p=3837" + }, + "put": "id:blog-search:blog_post::507823" + }, + { + "fields": { + "author": "5", + "blog": "4", + "blogname": "Matt on Not-WordPress", + "categories": [ + "Moblog" + ], + "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\" alt=\"\" title=\"photo20\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3840\" /></a>", + "date": 20120328, + "date_gmt": "2012-03-28 04:41:37", + "language": "en", + "popularity": 1.0, + "post_id": "1406963", + "tags": [], + "title": "Oven roasted tomatoes", + "url": "http://matt.wordpress.com/?p=3839" + }, + "put": "id:blog-search:blog_post::1406963" + }, + { + "fields": { + "author": "5", + "blog": "4", + "blogname": "Matt on Not-WordPress", + "categories": [ + "Moblog" + ], + "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\" alt=\"\" title=\"photo21\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3842\" /></a>", + "date": 20120328, + "date_gmt": "2012-03-28 19:59:45", + "language": "en", + "popularity": 1.0, + "post_id": "1329369", + "tags": [], + "title": "Fish tacos and spicy slaw", + "url": "http://matt.wordpress.com/?p=3841" + }, + "put": "id:blog-search:blog_post::1329369" + }] + + for i in range(0, 3): + self.assertEqual(json.loads(output_array[i]), compare_with[i]) + +if __name__ == '__main__': + unittest.main()
\ No newline at end of file diff --git a/sample-apps/blog-tutorial-shared/src/test/resources/trainPostsSample.json b/sample-apps/blog-tutorial-shared/src/test/resources/trainPostsSample.json new file mode 100644 index 00000000000..c7b7e32f396 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/resources/trainPostsSample.json @@ -0,0 +1,10 @@ +{"date_gmt":"2012-03-28 03:36:57", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3837", "title": "#vipworkshop dinner", "blog": "4", "post_id": "507823", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-28 03:36:57", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\" alt=\"\" title=\"photo19\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3838\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-03-31 11:43:38", "uid": "6218184"}, {"dt": "2012-03-28 10:25:22", "uid": "11335199"}]} +{"date_gmt":"2012-03-28 04:41:37", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3839", "title": "Oven roasted tomatoes", "blog": "4", "post_id": "1406963", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-28 04:41:37", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\" alt=\"\" title=\"photo20\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3840\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-03-31 11:43:03", "uid": "6218184"}, {"dt": "2012-03-28 05:01:34", "uid": "26248885"}]} +{"date_gmt":"2012-03-28 19:59:45", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3841", "title": "Fish tacos and spicy slaw", "blog": "4", "post_id": "1329369", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-28 19:59:45", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\" alt=\"\" title=\"photo21\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3842\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-03-28 21:05:37", "uid": "31367867"}]} +{"date_gmt":"2012-03-31 00:58:37", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3845", "title": "White corn guacamole", "blog": "4", "post_id": "916703", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-31 00:58:37", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo22.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo22.jpg\" alt=\"\" title=\"photo22\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3846\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-03-31 11:42:15", "uid": "6218184"}, {"dt": "2012-03-31 03:32:06", "uid": "33301824"}]} +{"date_gmt":"2012-03-31 18:46:59", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3847", "title": "#WordPress cake and wine", "blog": "4", "post_id": "1829542", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-31 18:46:59", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo23.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo23.jpg\" alt=\"\" title=\"photo23\" width=\"640\" height=\"480\" class=\"alignnone size-full wp-image-3848\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-04-04 14:22:40", "uid": "21552"}, {"dt": "2012-04-01 07:51:59", "uid": "50392"}, {"dt": "2012-04-12 16:58:42", "uid": "53742"}, {"dt": "2012-04-05 13:37:51", "uid": "396702"}, {"dt": "2012-04-02 22:30:22", "uid": "414033"}, {"dt": "2012-04-01 08:17:25", "uid": "872435"}, {"dt": "2012-03-31 18:50:18", "uid": "1156143"}, {"dt": "2012-04-01 07:25:27", "uid": "1246555"}, {"dt": "2012-03-31 19:26:40", "uid": "2177886"}, {"dt": "2012-03-31 19:13:17", "uid": "3346825"}, {"dt": "2012-04-01 21:50:49", "uid": "5073742"}, {"dt": "2012-03-31 19:46:11", "uid": "6134205"}, {"dt": "2012-03-31 19:07:17", "uid": "6433901"}, {"dt": "2012-04-04 01:01:08", "uid": "6894686"}, {"dt": "2012-03-31 18:53:19", "uid": "7073116"}, {"dt": "2012-03-31 18:51:52", "uid": "8288845"}, {"dt": "2012-04-23 10:17:31", "uid": "12788480"}, {"dt": "2012-04-05 15:50:45", "uid": "28688316"}, {"dt": "2012-04-01 19:46:36", "uid": "29630467"}, {"dt": "2012-03-31 18:48:49", "uid": "33013507"}]} +{"date_gmt":"2012-04-01 03:48:07", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3849", "title": "Oysters!", "blog": "4", "post_id": "1197076", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-04-01 03:48:07", "content": "<a href=\"http://matt.files.wordpress.com/2012/04/photo.jpg\"><img src=\"http://matt.files.wordpress.com/2012/04/photo.jpg\" alt=\"\" title=\"photo\" width=\"640\" height=\"480\" class=\"alignnone size-full wp-image-3850\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-04-01 03:59:19", "uid": "5360368"}, {"dt": "2012-04-01 03:52:29", "uid": "8689260"}, {"dt": "2012-04-10 06:08:23", "uid": "26404032"}]} +{"date_gmt":"2012-04-01 19:05:20", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3851", "title": "Crab and artichoke pizza", "blog": "4", "post_id": "377833", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-04-01 19:05:20", "content": "<a href=\"http://matt.files.wordpress.com/2012/04/photo1.jpg\"><img src=\"http://matt.files.wordpress.com/2012/04/photo1.jpg\" alt=\"\" title=\"photo1\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3852\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-04-13 19:29:25", "uid": "6218184"}]} +{"date_gmt":"2012-04-01 19:48:38", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3853", "title": "Framboise float and brownie", "blog": "4", "post_id": "871687", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-04-01 19:48:38", "content": "<a href=\"http://matt.files.wordpress.com/2012/04/photo2.jpg\"><img src=\"http://matt.files.wordpress.com/2012/04/photo2.jpg\" alt=\"\" title=\"photo2\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3854\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-04-13 19:28:36", "uid": "6218184"}]} +{"date_gmt":"2012-04-01 21:09:09", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3855", "title": "Rothko #10", "blog": "4", "post_id": "1893680", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-04-01 21:09:09", "content": "<a href=\"http://matt.files.wordpress.com/2012/04/photo3.jpg\"><img src=\"http://matt.files.wordpress.com/2012/04/photo3.jpg\" alt=\"\" title=\"photo3\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3856\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-04-01 21:41:34", "uid": "2096880"}, {"dt": "2012-04-02 03:50:33", "uid": "4634349"}, {"dt": "2012-04-01 21:10:13", "uid": "6766437"}, {"dt": "2012-04-01 21:35:48", "uid": "11335199"}, {"dt": "2012-04-01 21:18:40", "uid": "11691159"}]} +{"date_gmt":"2012-04-03 13:22:50", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3857", "title": "Port lights", "blog": "4", "post_id": "891295", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-04-03 13:22:50", "content": "<a href=\"http://matt.files.wordpress.com/2012/04/photo4.jpg\"><img src=\"http://matt.files.wordpress.com/2012/04/photo4.jpg\" alt=\"\" title=\"photo4\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3858\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-04-04 05:50:43", "uid": "4204944"}, {"dt": "2012-04-03 15:23:45", "uid": "6134205"}, {"dt": "2012-04-03 13:27:01", "uid": "8399160"}, {"dt": "2012-04-03 13:36:09", "uid": "11335199"}, {"dt": "2012-04-03 13:24:17", "uid": "33301824"}]} diff --git a/sample-apps/blog-tutorial-shared/src/test/resources/trainPostsSampleWith3Elements.json b/sample-apps/blog-tutorial-shared/src/test/resources/trainPostsSampleWith3Elements.json new file mode 100644 index 00000000000..c144300fe37 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/resources/trainPostsSampleWith3Elements.json @@ -0,0 +1,3 @@ +{"date_gmt":"2012-03-28 03:36:57", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3837", "title": "#vipworkshop dinner", "blog": "4", "post_id": "507823", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-28 03:36:57", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo19.jpg\" alt=\"\" title=\"photo19\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3838\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-03-31 11:43:38", "uid": "6218184"}, {"dt": "2012-03-28 10:25:22", "uid": "11335199"}]} +{"date_gmt":"2012-03-28 04:41:37", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3839", "title": "Oven roasted tomatoes", "blog": "4", "post_id": "1406963", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-28 04:41:37", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo20.jpg\" alt=\"\" title=\"photo20\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3840\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-03-31 11:43:03", "uid": "6218184"}, {"dt": "2012-03-28 05:01:34", "uid": "26248885"}]} +{"date_gmt":"2012-03-28 19:59:45", "language": "en", "author": "5", "url": "http://matt.wordpress.com/?p=3841", "title": "Fish tacos and spicy slaw", "blog": "4", "post_id": "1329369", "tags": [], "blogname": "Matt on Not-WordPress", "date": "2012-03-28 19:59:45", "content": "<a href=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\"><img src=\"http://matt.files.wordpress.com/2012/03/photo21.jpg\" alt=\"\" title=\"photo21\" width=\"1000\" height=\"750\" class=\"alignnone size-full wp-image-3842\" /></a>", "categories": ["Moblog"], "likes": [{"dt": "2012-03-28 21:05:37", "uid": "31367867"}]} diff --git a/sample-apps/blog-tutorial-shared/src/test/resources/trainingSetIndicesSample.txt b/sample-apps/blog-tutorial-shared/src/test/resources/trainingSetIndicesSample.txt new file mode 100644 index 00000000000..7312ed2fdca --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/resources/trainingSetIndicesSample.txt @@ -0,0 +1,20 @@ +1000136 30856199 +1000631 19883445 +1000631 24350500 +1000631 25936432 +1000631 29881381 +1000776 30532923 +1001135 20234756 +1001135 31697777 +100156 12076558 +100156 12159263 +100156 17277682 +100156 964144 +1002462 1878322 +1002515 16433161 +1002515 21922168 +1002515 23039563 +1002515 2493815 +1002599 23733565 +1002599 29430402 +1002599 30449719 diff --git a/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/BlogRecommendationAppTest.scala b/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/BlogRecommendationAppTest.scala new file mode 100644 index 00000000000..6f79d752fa1 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/BlogRecommendationAppTest.scala @@ -0,0 +1,9 @@ +package com.yahoo.example.blog + +import org.scalatest.FunSuite + +class BlogRecommendationAppTest extends FunSuite { + + test("CollaborativeFilteringApp writes user and item latent factors to output path") (pending) + +}
\ No newline at end of file diff --git a/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/CollaborativeFilteringTest.scala b/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/CollaborativeFilteringTest.scala new file mode 100644 index 00000000000..c660b45630a --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/CollaborativeFilteringTest.scala @@ -0,0 +1,66 @@ +package com.yahoo.example.blog + +import org.apache.spark.ml.recommendation.ALSModel +import org.apache.spark.sql.SparkSession +import org.scalatest.Matchers._ +import org.scalatest._ + +class CollaborativeFilteringTest extends FunSuite with BeforeAndAfter { + + var ss: SparkSession = _ + + before { + + ss = SparkSession + .builder() + .appName("Unit Test") + .master("local[*]") + .getOrCreate() + + } + + after { + ss.stop() + } + + test("run method returns a MatrixFactorizationModel with latent factors of size 10 to user and item") { + + val file_path = getClass.getResource("/trainingSetIndicesSample.txt") + + val cf = new CollaborativeFiltering(ss) + + val model = cf.run( + input_path = file_path.toString, + rank = 10, + numIterations = 10, + lambda = 0.01) + + model shouldBe a [ALSModel] + + val product_feature_array = model.itemFactors.first().getSeq(1) + assertResult(10){product_feature_array.length} + + val user_feature_array = model.userFactors.first().getSeq(1) + assertResult(10){user_feature_array.length} + + } + + test("run_pipeline method returns a MatrixFactorizationModel with latent factors of size 10 to user and item") { + + val file_path = getClass.getResource("/trainingSetIndicesSample.txt") + + val cf = new CollaborativeFiltering(ss) + + val model = cf.run_pipeline(input_path = file_path.toString, numIterations = 10) + + model shouldBe a [ALSModel] + + val product_feature_array = model.itemFactors.first().getSeq(1) + assertResult(10){product_feature_array.length} + + val user_feature_array = model.userFactors.first().getSeq(1) + assertResult(10){user_feature_array.length} + + } + +} diff --git a/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/SplitFullSetIntoTrainAndTestSetsTest.scala b/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/SplitFullSetIntoTrainAndTestSetsTest.scala new file mode 100644 index 00000000000..43c08492c71 --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/test/scala/com/yahoo/example/blog/SplitFullSetIntoTrainAndTestSetsTest.scala @@ -0,0 +1,41 @@ +package com.yahoo.example.blog + +import org.apache.spark.sql.{SparkSession, DataFrame} +import org.scalatest.Matchers._ +import org.scalatest._ + +class SplitFullSetIntoTrainAndTestSetsTest extends FunSuite with BeforeAndAfter { + + var ss: SparkSession = _ + + before { + + ss = SparkSession + .builder() + .appName("Unit Test") + .master("local[*]") + .getOrCreate() + + } + + after { + ss.stop() + } + + test("SplitFullSetIntoTrainAndTestSets should return an Array of DataFrame") { + + val file_path = getClass.getResource("/trainPostsSample.json") + + val splitter = new SplitFullSetIntoTrainAndTestSets(ss) + + val sets = splitter.run(input_file_path = file_path.toString, + test_perc_stage1 = 0.05, + test_perc_stage2 = 0.15, + seed = 123) + + sets shouldBe a [Array[DataFrame]] + + } + +} + |