diff options
author | Kristian Aune <kraune@yahoo-inc.com> | 2017-06-06 14:26:02 +0200 |
---|---|---|
committer | Kristian Aune <kraune@yahoo-inc.com> | 2017-06-06 14:26:02 +0200 |
commit | 8ff2b8e72d7590ce02ec136a2f482c1e8e202961 (patch) | |
tree | 2d59e04adc7c1272eba3da72e9484d3e05752eb6 /sample-apps | |
parent | ef89ead652b55d7742767aea7e9c3d9243f19336 (diff) |
move support code to repo
Diffstat (limited to 'sample-apps')
13 files changed, 740 insertions, 0 deletions
diff --git a/sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig b/sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig new file mode 100644 index 00000000000..9a536f38779 --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig @@ -0,0 +1,100 @@ +REGISTER vespa-hadoop.jar + +-- Create valid Vespa put operations +DEFINE VespaPutOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:blog_post::<post_id>', + 'create-tensor-fields=user_item_cf', + 'simple-array-fields=tags,categories' + ); + +DEFINE VespaPutOperationUser + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:user::<user_id>', + 'create-tensor-fields=user_item_cf', + 'simple-array-fields=has_read_items' + ); + +-- Transform tabular data to a Vespa document operation JSON format +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- DEFINE VespaStorage +-- com.yahoo.vespa.hadoop.pig.VespaStorage( +-- 'create-document-operation=true', +-- 'operation=put', +-- 'docid=id:blog-recommendation:blog_post::<post_id>' +-- ); + +-- Load data from any source - here we load using PigStorage +data = LOAD 'blog-recommendation/trainPostsFinal' USING JsonLoader('date_gmt:chararray, language:chararray, author:chararray, url:chararray, title:chararray, blog:chararray, post_id:chararray, tags:{T:(tag_name:chararray)}, blogname:chararray, date:chararray, content:chararray, categories:{T:(category_name:chararray)}, likes:{T:(dt:chararray, uid:chararray)}'); + +data_for_feed = FOREACH data GENERATE + date_gmt, + language, + author, + url, + title, + blog, + post_id, + tags, + blogname, + content, + categories; + +data_doc = LOAD 'blog-recommendation/user_item_cf/product_features' USING JsonLoader('post_id:chararray, user_item_cf:[double]'); + +data_content_and_doc_tensor = JOIN data_for_feed BY post_id LEFT, data_doc BY post_id; +data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE + date_gmt AS date_gmt, + language AS language, + author AS author, + url AS url, + title AS title, + blog AS blog, + data_for_feed::post_id as post_id, + tags AS tags, + blogname AS blogname, + content AS content, + categories AS categories, + user_item_cf AS user_item_cf, + (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf; + +data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*); + +-- use cf latent factor +data_user = LOAD 'blog-recommendation/user_item_cf/user_features' USING JsonLoader('user_id:chararray, user_item_cf:[double]'); +data_user = FOREACH data_user GENERATE + user_id AS user_id, + user_item_cf AS user_item_cf; + + +-- Articles already liked +data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid); + +post_liked_per_user = GROUP data_likes BY uid; +post_liked_per_user = FOREACH post_liked_per_user GENERATE group AS user_id, data_likes.post_id AS has_read_items; + +-- Join user data +data_user = JOIN post_liked_per_user BY user_id FULL, data_user BY user_id; + +data_user = FOREACH data_user GENERATE (post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id, + user_item_cf AS user_item_cf, + (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf, + has_read_items AS has_read_items; +data_user = FILTER data_user BY user_id IS NOT NULL; + +data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*); + +joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed; + +-- STORE data_for_feed into 'vespa_put_operation'; + +-- Store into Vespa +STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage(); + + + + diff --git a/sample-apps/blog-recommendation/src/pig/feed_content_vespa.pig b/sample-apps/blog-recommendation/src/pig/feed_content_vespa.pig new file mode 100644 index 00000000000..59b173e16f4 --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/feed_content_vespa.pig @@ -0,0 +1,71 @@ +REGISTER vespa-hadoop.jar + +-- UDF to create valid Vespa document operation in JSON format +DEFINE VespaPutOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-search:blog_post::<post_id>', + 'simple-array-fields=tags,categories' + ); + +-- UDF to send data to a Vespa endpoint +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data from any source - here we load using JsonLoader +data_first_release = LOAD 'blog-recommendation/first_release/trainPosts.json' USING + JsonLoader('date_gmt:chararray, + language:chararray, + author:chararray, + url:chararray, + title:chararray, + blog:chararray, + post_id:chararray, + tags:{T:(tag_name:chararray)}, + blogname:chararray, + date:chararray, + content:chararray, + categories:{T:(category_name:chararray)}, + likes:{T:(dt:chararray, uid:chararray)}'); + +data_second_release = LOAD 'blog-recommendation/second_release/trainPosts.json' USING + JsonLoader('date_gmt:chararray, + language:chararray, + author:chararray, + url:chararray, + title:chararray, + blog:chararray, + post_id:chararray, + tags:{T:(tag_name:chararray)}, + blogname:chararray, + date:chararray, + content:chararray, + categories:{T:(category_name:chararray)}, + likes:{T:(dt:chararray, uid:chararray)}'); + +data = UNION data_first_release, data_second_release; + +-- Select fields that will be sent to Vespa. +-- This should follow blog_post.sd +data_for_feed = FOREACH data GENERATE + date_gmt, + language, + author, + url, + title, + blog, + post_id, + tags, + blogname, + content, + categories; + +-- Create valid Vespa put operations in JSON format +data_for_feed_json = FOREACH data_for_feed GENERATE VespaPutOperationDoc(*); + +-- Sample Vespa operations +-- data_for_feed_json_sample = SAMPLE data_for_feed_json 0.0005; +-- STORE data_for_feed_json_sample INTO 'blog-sample'; + +-- Store into Vespa +STORE data_for_feed_json INTO '$ENDPOINT' USING VespaStorage();
\ No newline at end of file diff --git a/sample-apps/blog-recommendation/src/pig/feed_user_item_cf_vespa.pig b/sample-apps/blog-recommendation/src/pig/feed_user_item_cf_vespa.pig new file mode 100644 index 00000000000..fd06394c3af --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/feed_user_item_cf_vespa.pig @@ -0,0 +1,37 @@ +REGISTER vespa-hadoop.jar + +-- Create valid Vespa put operations +DEFINE VespaPutOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:blog_post::<post_id>', + 'create-tensor-fields=user_item_cf' + ); + +DEFINE VespaPutOperationUser + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:user::<user_id>', + 'create-tensor-fields=user_item_cf' + ); + +-- Transform tabular data to a Vespa document operation JSON format +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + + +data_doc = LOAD 'blog-recommendation/user_item_cf/product_features' USING JsonLoader('post_id:chararray, user_item_cf:[double]'); +data_doc_for_feed = FOREACH data_doc GENERATE VespaPutOperationDoc(*); + + +data_user = LOAD 'blog-recommendation/user_item_cf/user_features' USING JsonLoader('user_id:chararray, user_item_cf:[double]'); +data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*); + + +-- Store into Vespa +STORE data_doc_for_feed INTO '$ENDPOINT' USING VespaStorage(); +STORE data_user_for_feed INTO '$ENDPOINT' USING VespaStorage(); + + + + diff --git a/sample-apps/blog-recommendation/src/pig/generate_user_item_cf_dataset.pig b/sample-apps/blog-recommendation/src/pig/generate_user_item_cf_dataset.pig new file mode 100644 index 00000000000..2e71dcbe9e3 --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/generate_user_item_cf_dataset.pig @@ -0,0 +1,15 @@ +-- Load data from any source - here we load using PigStorage +data = LOAD 'blog-recommendation/trainPostsFinal' USING JsonLoader('date_gmt:chararray, language:chararray, author:chararray, url:chararray, title:chararray, blog:chararray, post_id:chararray, tags:{T:(tag_name:chararray)}, blogname:chararray, date:chararray, content:chararray, categories:{T:(category_name:chararray)}, likes:{T:(dt:chararray, uid:chararray)}'); + +data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid); + +data_cf = FOREACH data_likes GENERATE uid, post_id, 1 as rate; + +data_cf = FILTER data_cf BY (uid IS NOT NULL) AND (uid != '') AND (post_id IS NOT NULL) AND (post_id != ''); + +-- data_cf_sample = SAMPLE data_cf 0.001; + +-- data_cf = LIMIT data_cf 10; + +STORE data_cf INTO 'blog-recommendation/trainPostsFinal_user_item_cf'; + diff --git a/sample-apps/blog-recommendation/src/pig/get_recommendations.pig b/sample-apps/blog-recommendation/src/pig/get_recommendations.pig new file mode 100644 index 00000000000..00b03b0f49a --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/get_recommendations.pig @@ -0,0 +1,29 @@ +-- REGISTER $VESPA_HADOOP_JAR +REGISTER vespa-hadoop.jar +-- REGISTER parquet-pig-bundle-1.8.1.jar + +-- Define Vespa query for retrieving blog posts +DEFINE BlogPostRecommendations + com.yahoo.vespa.hadoop.pig.VespaQuery( + 'query=http://ENDPOINT:8080/search/?user_id=<userid>&hits=100', + 'schema=rank:int,id:chararray,relevance:double,fields/post_id:chararray' + ); + +-- Load test_set data from a local file +test_set = LOAD 'data/cv/test_set_exploded' AS (post_id:chararray, userid:chararray); +users = FOREACH test_set GENERATE userid; +users = FILTER users BY userid IS NOT null; +users = DISTINCT users; + +users_limit = LIMIT users 10; + +-- Run a set of queries against Vespa +recommendations = FOREACH users_limit GENERATE userid, + FLATTEN(BlogPostRecommendations(*)) AS (rank, id, relevance, post_id); +recommendations = FOREACH recommendations GENERATE userid, rank, post_id; + +recommendations = FILTER recommendations BY rank IS NOT NULL AND post_id IS NOT NULL; + +-- Output recommendations +STORE recommendations INTO 'data/recommendations' USING PigStorage('\t', '-schema'); +-- STORE recommendations INTO 'data/recommendations' USING org.apache.parquet.pig.ParquetStorer(); diff --git a/sample-apps/blog-recommendation/src/pig/tutorial_blog_popularity.pig b/sample-apps/blog-recommendation/src/pig/tutorial_blog_popularity.pig new file mode 100644 index 00000000000..4dac36a717f --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/tutorial_blog_popularity.pig @@ -0,0 +1,55 @@ +REGISTER '$VESPA_HADOOP_JAR' + +-- UDF to create valid Vespa document operation in JSON format +DEFINE VespaUpdateOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=update', + 'docid=id:blog-search:blog_post::<post_id>' + ); + +-- UDF to send data to a Vespa endpoint +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data from any source - here we load using JsonLoader +data = LOAD '$DATA_PATH' USING + JsonLoader('date_gmt:chararray, + language:chararray, + author:chararray, + url:chararray, + title:chararray, + blog:chararray, + post_id:chararray, + tags:{T:(tag_name:chararray)}, + blogname:chararray, + date:chararray, + content:chararray, + categories:{T:(category_name:chararray)}, + likes:{T:(dt:chararray, uid:chararray)}'); + +data = FILTER data BY likes IS NOT NULL; + +data_likes = FOREACH data GENERATE + blog, + post_id, + blogname, + FLATTEN(likes) AS (dt, uid); + +-- data_likes_limit = LIMIT data_likes 10; + +likes = FOREACH (GROUP data_likes ALL) + GENERATE COUNT(data_likes) as total_number; + +blog_popularity = FOREACH (GROUP data_likes BY blog) GENERATE + group as blog, + (double)COUNT(data_likes)/(double)likes.total_number AS popularity; + +data_update = JOIN data_likes BY blog, blog_popularity BY blog; +data_update = FOREACH data_update GENERATE + post_id, popularity; + +-- Create valid Vespa put operations in JSON format +data_for_feed_json = FOREACH data_update GENERATE VespaUpdateOperationDoc(*); + +-- Store into Vespa +STORE data_for_feed_json INTO '$ENDPOINT' USING VespaStorage(); diff --git a/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig new file mode 100644 index 00000000000..77943fd842a --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig @@ -0,0 +1,116 @@ +REGISTER '$VESPA_HADOOP_JAR' + +-- Create valid Vespa put operations +DEFINE VespaPutOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:blog_post::<post_id>', + 'create-tensor-fields=user_item_cf', + 'simple-array-fields=tags,categories' + ); + +DEFINE VespaPutOperationUser + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-recommendation:user::<user_id>', + 'create-tensor-fields=user_item_cf', + 'simple-array-fields=has_read_items' + ); + +-- Transform tabular data to a Vespa document operation JSON format +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data +data = LOAD '$DATA_PATH' USING + JsonLoader('date_gmt:chararray, + language:chararray, + author:chararray, + url:chararray, + title:chararray, + blog:chararray, + post_id:chararray, + tags:{T:(tag_name:chararray)}, + blogname:chararray, + date:chararray, + content:chararray, + categories:{T:(category_name:chararray)}, + likes:{T:(dt:chararray, uid:chararray)}'); + +data_for_feed = FOREACH data GENERATE + date_gmt, + language, + author, + url, + title, + blog, + post_id, + tags, + blogname, + content, + categories; + +-- Load Blog post CF latent factors +data_doc = LOAD '$BLOG_POST_FACTORS' USING + JsonLoader('post_id:chararray, + user_item_cf:[double]'); + +-- Join data and latent factors +data_content_and_doc_tensor = JOIN data_for_feed BY post_id LEFT, data_doc BY post_id; +data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE + date_gmt AS date_gmt, + language AS language, + author AS author, + url AS url, + title AS title, + blog AS blog, + data_for_feed::post_id as post_id, + tags AS tags, + blogname AS blogname, + content AS content, + categories AS categories, + user_item_cf AS user_item_cf, + (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf; + +-- Generate valid Vespa JSON format +data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*); + +-- Load User CF latent factors +data_user = LOAD '$USER_FACTORS' USING + JsonLoader('user_id:chararray, + user_item_cf:[double]'); +data_user = FOREACH data_user GENERATE + user_id AS user_id, + user_item_cf AS user_item_cf; + +-- Articles already liked +data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid); + +post_liked_per_user = GROUP data_likes BY uid; +post_liked_per_user = FOREACH post_liked_per_user GENERATE + group AS user_id, + data_likes.post_id AS has_read_items; + +-- Join user data +data_user = JOIN post_liked_per_user BY user_id FULL, + data_user BY user_id; + +data_user = FOREACH data_user GENERATE + (post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id, + user_item_cf AS user_item_cf, + (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf, + has_read_items AS has_read_items; + +data_user = FILTER data_user BY user_id IS NOT NULL; + +-- Generate valid Vespa JSON format +data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*); + +joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed; + +-- Store into Vespa +STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage(); + + + + diff --git a/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_vespa.pig b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_vespa.pig new file mode 100644 index 00000000000..d20ccf505a9 --- /dev/null +++ b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_vespa.pig @@ -0,0 +1,51 @@ +REGISTER '$VESPA_HADOOP_JAR' +-- REGISTER vespa-hadoop.jar + +-- UDF to create valid Vespa document operation in JSON format +DEFINE VespaPutOperationDoc + com.yahoo.vespa.hadoop.pig.VespaDocumentOperation( + 'operation=put', + 'docid=id:blog-search:blog_post::<post_id>', + 'simple-array-fields=tags,categories' + ); + +-- UDF to send data to a Vespa endpoint +DEFINE VespaStorage + com.yahoo.vespa.hadoop.pig.VespaStorage(); + +-- Load data from any source - here we load using JsonLoader +data = LOAD '$DATA_PATH' USING + JsonLoader('date_gmt:chararray, + language:chararray, + author:chararray, + url:chararray, + title:chararray, + blog:chararray, + post_id:chararray, + tags:{T:(tag_name:chararray)}, + blogname:chararray, + date:chararray, + content:chararray, + categories:{T:(category_name:chararray)}, + likes:{T:(dt:chararray, uid:chararray)}'); + +-- Select fields that will be sent to Vespa. +-- This should follow blog_post.sd +data_for_feed = FOREACH data GENERATE + date_gmt, + language, + author, + url, + title, + blog, + post_id, + tags, + blogname, + content, + categories; + +-- Create valid Vespa put operations in JSON format +data_for_feed_json = FOREACH data_for_feed GENERATE VespaPutOperationDoc(*); + +-- Store into Vespa +STORE data_for_feed_json INTO '$ENDPOINT' USING VespaStorage(); diff --git a/sample-apps/blog-recommendation/src/spark/collaborative_filtering_example.scala b/sample-apps/blog-recommendation/src/spark/collaborative_filtering_example.scala new file mode 100644 index 00000000000..1a2c8f92730 --- /dev/null +++ b/sample-apps/blog-recommendation/src/spark/collaborative_filtering_example.scala @@ -0,0 +1,59 @@ +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.MatrixFactorizationModel +import org.apache.spark.mllib.recommendation.Rating +import scala.util.parsing.json.JSONObject + +// Load and parse the data +val data = sc.textFile("blog-recommendation/trainPostsFinal_user_item_cf") +val ratings = data.map(_.split('\t') match { case Array(user, item, rate) => + Rating(user.toInt, item.toInt, rate.toDouble) +}) + +// Build the recommendation model using ALS +val rank = 10 +val numIterations = 10 +val model = ALS.train(ratings, rank, numIterations, 0.01) + +// Evaluate the model on rating data +val usersProducts = ratings.map { case Rating(user, product, rate) => + (user, product) +} +val predictions = + model.predict(usersProducts).map { case Rating(user, product, rate) => + ((user, product), rate) + } +val ratesAndPreds = ratings.map { case Rating(user, product, rate) => + ((user, product), rate) +}.join(predictions) +val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => + val err = (r1 - r2) + err * err +}.mean() +println("Mean Squared Error = " + MSE) + +def writeModelFeaturesAsTensor (modelFeatures:(Int, Array[Double]), id_string:String) = { + + val id = modelFeatures._1 + val latentVector = modelFeatures._2 + var latentVectorMap:Map[String,Double] = Map() + var output:Map[String,Any] = Map() + + for ( i <- 0 until latentVector.length ){ + + latentVectorMap += (("user_item_cf:" + i.toString, latentVector(i))) + + } + + output += ((id_string, id)) + output += (("user_item_cf", scala.util.parsing.json.JSONObject(latentVectorMap))) + + JSONObject(output) + +} + +val product_features = model.productFeatures.map(x => writeModelFeaturesAsTensor(x, "post_id")) +product_features.saveAsTextFile("blog-recommendation/user_item_cf/product_features") +val user_features = model.userFeatures.map(x => writeModelFeaturesAsTensor(x, "user_id")) +user_features.saveAsTextFile("blog-recommendation/user_item_cf/user_features") + + diff --git a/sample-apps/blog-recommendation/src/spark/data_exploration.scala b/sample-apps/blog-recommendation/src/spark/data_exploration.scala new file mode 100644 index 00000000000..228834cfb4b --- /dev/null +++ b/sample-apps/blog-recommendation/src/spark/data_exploration.scala @@ -0,0 +1,63 @@ +// sc is an existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +val original_train_post_path = "blog-recommendation-support/data/original_data/trainPosts.json" +val original_train_post_thin_path = "blog-recommendation-support/data/original_data/trainPostsThin.json" +val original_test_post_thin_path = "blog-recommendation-support/data/original_data/testPostsThin.json" + +val original_train_post = sqlContext.read.json(original_train_post_path) +val original_train_post_thin = sqlContext.read.json(original_train_post_thin_path) +val original_test_post_thin = sqlContext.read.json(original_test_post_thin_path) + +val count_original_train = original_train_post.count() +val count_original_train_thin = original_train_post_thin.count() +val count_original_test_thin = original_test_post_thin.count() + +// The inferred schema can be visualized using the printSchema() method. +original_train_post.printSchema() +original_train_post_thin.printSchema() +original_test_post_thin.printSchema() + +// No intersection between train and test data +original_train_post_thin.join(original_test_post_thin, original_train_post_thin("post_id") == original_test_post_thin("post_id")).count(2) + +// original_train_minimal_df +var original_train_minimal_df = original_train_post.select($"date_gmt", $"post_id", size($"likes").as("number_likes"), $"likes") +// no duplicate post_id +original_train_minimal_df.select("post_id").dropDuplicates().count() - original_train_minimal_df.select("post_id").count() + +// CHECK THIS DECISION - I SHOULD NOT EXLUDE POST_ID WITH ZERO LIKES +// OTHERWISE THERE WILL BE NO DOCUMENT IN THE TEST SET THAT NO ONE HAS LIKED, +// WHICH MAKES THE EXERCISE MUCH EASIER +// only post_id with at least one like +// original_train_minimal_df = original_train_minimal_df.filter("number_likes > 0") + +// Set some post_id aside to be present only on the test set +var sets = original_train_minimal_df.randomSplit(Array(0.95, 0.05), 123) + +var training_set = sets(0) +var test_set = sets(1) + +// flat dataframe so that each line is a combination of post_id and user +training_set = training_set.select($"post_id", explode($"likes").as("likes_flat")) +training_set = training_set.select("post_id", "likes_flat.uid") + +test_set = test_set.select($"post_id", explode($"likes").as("likes_flat")) +test_set = test_set.select("post_id", "likes_flat.uid") + +// randomly move some (post_id, uid) from training set to test set +sets = training_set.randomSplit(Array(0.85, 0.15), 123) + +training_set = sets(0) +var additional_test_set = sets(1) + +// concatenate test_set and additional_test_set +test_set = test_set.unionAll(additional_test_set) + +// see number of likes distribution +val like_dist = original_train_minimal_df.groupBy("number_likes").count().orderBy(asc("number_likes")).collect() +like_dist.map(println) + + + + diff --git a/sample-apps/blog-recommendation/src/spark/expected_percentile.scala b/sample-apps/blog-recommendation/src/spark/expected_percentile.scala new file mode 100644 index 00000000000..986a3eb79f4 --- /dev/null +++ b/sample-apps/blog-recommendation/src/spark/expected_percentile.scala @@ -0,0 +1,39 @@ +val test_file_path = "data/cv/test_set_exploded" +val blog_recom_file_path = "data/recommendations" +val size_recommendation_list = 100 + +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +val test_set = sc.textFile(test_file_path). + map(_.split("\t")).map(p => (p(0).toString, p(1).toString)). + toDF("post_id", "user_id") + +val recommendations = sc.textFile(blog_recom_file_path). + map(_.split("\t")).map(p => (p(0).toString, p(1).toString, p(2).toString)). + toDF("user_id", "rank", "post_id") + +// val recommendations = sqlContext.createDataFrame(Seq( +// ("16966742", "5", "1009088"), +// ("30463255", "10", "1044974") +// )).toDF("user_id", "rank", "post_id") + +// join data +var joined_data = test_set. + join(recommendations, + test_set("post_id") === recommendations("post_id") && + test_set("user_id") === recommendations("user_id")). + select(test_set("post_id"), + test_set("user_id"), + recommendations("rank")) + +// transform and add a column +joined_data = joined_data.withColumn("percentile", joined_data("rank")/size_recommendation_list) + +val expected_percentile = joined_data. + // groupBy($"user_id"). + groupBy(). + agg(sum($"percentile").as("sum_percentile"), + count($"post_id").as("number_read")). + withColumn("expected_percentile", $"sum_percentile" / $"number_read") + +expected_percentile.show()
\ No newline at end of file diff --git a/sample-apps/blog-recommendation/src/spark/full_dataset_cf.scala b/sample-apps/blog-recommendation/src/spark/full_dataset_cf.scala new file mode 100644 index 00000000000..0b76e8b8b1c --- /dev/null +++ b/sample-apps/blog-recommendation/src/spark/full_dataset_cf.scala @@ -0,0 +1,60 @@ +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.MatrixFactorizationModel +import org.apache.spark.mllib.recommendation.Rating +import scala.util.parsing.json.JSONObject + +// Prepare data + +val data_path = "data/original_data/trainPosts.json" + +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +val full_dataset = sqlContext.read.json(data_path) + +var data = full_dataset.select($"post_id", explode($"likes").as("likes_flat")) +data = data.select($"likes_flat.uid".as("uid"), $"post_id") + +data = data.filter("uid is not null and uid != '' and post_id is not null and post_id != ''") + +val ratings = data.rdd.map(x => (x(0).toString, x(1).toString) match { + case (user, item) => Rating(user.toInt, item.toInt, 1) +}) + +// Train the model + +val rank = 10 +val numIterations = 10 +val model = ALS.train(ratings, rank, numIterations, 0.01) + +// Convert latent vectors from model to Vespa Tensor model + +def writeModelFeaturesAsTensor (modelFeatures:(Int, Array[Double]), id_string:String) = { + + val id = modelFeatures._1 + val latentVector = modelFeatures._2 + var latentVectorMap:Map[String,Double] = Map() + var output:Map[String,Any] = Map() + + for ( i <- 0 until latentVector.length ){ + + latentVectorMap += (("user_item_cf:" + i.toString, latentVector(i))) + + } + + output += ((id_string, id)) + output += (("user_item_cf", scala.util.parsing.json.JSONObject(latentVectorMap))) + + JSONObject(output) + +} + +// Write user and item latent factors to disk + +val product_features = model.productFeatures.map(x => writeModelFeaturesAsTensor(x, "post_id")) +product_features.saveAsTextFile("data/user_item_cf/product_features") +val user_features = model.userFeatures.map(x => writeModelFeaturesAsTensor(x, "user_id")) +user_features.saveAsTextFile("data/user_item_cf/user_features") + + + + diff --git a/sample-apps/blog-recommendation/src/spark/train_test_set_division.scala b/sample-apps/blog-recommendation/src/spark/train_test_set_division.scala new file mode 100644 index 00000000000..2fc67734386 --- /dev/null +++ b/sample-apps/blog-recommendation/src/spark/train_test_set_division.scala @@ -0,0 +1,45 @@ +import org.apache.spark.sql.functions.udf + +// Inputs +val input_file_path = "data/original_data/trainPosts.json" +val test_perc_stage1 = 0.05 +val test_perc_stage2 = 0.15 +val training_file_path = "data/cv/training_set_exploded" +val test_file_path = "data/cv/test_set_exploded" +val seed = 123 + +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +// Load full dataset +val full_dataset = sqlContext.read.json(input_file_path) +val full_dataset_simple = full_dataset.select($"post_id", size($"likes").as("number_likes"), $"likes") + +// Set some blog posts aside to be present only on the test set +var sets = full_dataset_simple.randomSplit(Array(1 - test_perc_stage1, test_perc_stage1), seed) + +var training_set = sets(0) +val training_set_null = training_set.filter("number_likes == 0") +var training_set_exploded = training_set.select($"post_id", explode($"likes").as("likes_flat")) +training_set_exploded = training_set_exploded.select("post_id", "likes_flat.uid") + +var test_set = sets(1) +val test_set_null = test_set.filter("number_likes == 0") +var test_set_exploded = test_set.select($"post_id", explode($"likes").as("likes_flat")) +test_set_exploded = test_set_exploded.select("post_id", "likes_flat.uid") + +// randomly move some (post_id, uid) from training set to test set +sets = training_set_exploded.randomSplit(Array(1 - test_perc_stage2, test_perc_stage2), seed) + +training_set_exploded = sets(0) + +var additional_test_set_exploded = sets(1) +test_set_exploded = test_set_exploded.unionAll(additional_test_set_exploded) + +// concatenate exploded set with null set +val getNull = udf(() => None: Option[String]) +training_set_exploded = training_set_exploded.unionAll(training_set_null.select("post_id").withColumn("uid", getNull())) +test_set_exploded = test_set_exploded.unionAll(test_set_null.select("post_id").withColumn("uid", getNull())) + +// Write to disk +training_set_exploded.rdd.map(x => x(0) + "\t" + x(1)).saveAsTextFile(training_file_path) +test_set_exploded.rdd.map(x => x(0) + "\t" + x(1)).saveAsTextFile(test_file_path)
\ No newline at end of file |