summaryrefslogtreecommitdiffstats
path: root/sample-apps
diff options
context:
space:
mode:
authorKristian Aune <kraune@yahoo-inc.com>2017-06-06 14:26:02 +0200
committerKristian Aune <kraune@yahoo-inc.com>2017-06-06 14:26:02 +0200
commit8ff2b8e72d7590ce02ec136a2f482c1e8e202961 (patch)
tree2d59e04adc7c1272eba3da72e9484d3e05752eb6 /sample-apps
parentef89ead652b55d7742767aea7e9c3d9243f19336 (diff)
move support code to repo
Diffstat (limited to 'sample-apps')
-rw-r--r--sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig100
-rw-r--r--sample-apps/blog-recommendation/src/pig/feed_content_vespa.pig71
-rw-r--r--sample-apps/blog-recommendation/src/pig/feed_user_item_cf_vespa.pig37
-rw-r--r--sample-apps/blog-recommendation/src/pig/generate_user_item_cf_dataset.pig15
-rw-r--r--sample-apps/blog-recommendation/src/pig/get_recommendations.pig29
-rw-r--r--sample-apps/blog-recommendation/src/pig/tutorial_blog_popularity.pig55
-rw-r--r--sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig116
-rw-r--r--sample-apps/blog-recommendation/src/pig/tutorial_feed_content_vespa.pig51
-rw-r--r--sample-apps/blog-recommendation/src/spark/collaborative_filtering_example.scala59
-rw-r--r--sample-apps/blog-recommendation/src/spark/data_exploration.scala63
-rw-r--r--sample-apps/blog-recommendation/src/spark/expected_percentile.scala39
-rw-r--r--sample-apps/blog-recommendation/src/spark/full_dataset_cf.scala60
-rw-r--r--sample-apps/blog-recommendation/src/spark/train_test_set_division.scala45
13 files changed, 740 insertions, 0 deletions
diff --git a/sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig b/sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig
new file mode 100644
index 00000000000..9a536f38779
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig
@@ -0,0 +1,100 @@
+REGISTER vespa-hadoop.jar
+
+-- Create valid Vespa put operations
+DEFINE VespaPutOperationDoc
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-recommendation:blog_post::<post_id>',
+ 'create-tensor-fields=user_item_cf',
+ 'simple-array-fields=tags,categories'
+ );
+
+DEFINE VespaPutOperationUser
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-recommendation:user::<user_id>',
+ 'create-tensor-fields=user_item_cf',
+ 'simple-array-fields=has_read_items'
+ );
+
+-- Transform tabular data to a Vespa document operation JSON format
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- DEFINE VespaStorage
+-- com.yahoo.vespa.hadoop.pig.VespaStorage(
+-- 'create-document-operation=true',
+-- 'operation=put',
+-- 'docid=id:blog-recommendation:blog_post::<post_id>'
+-- );
+
+-- Load data from any source - here we load using PigStorage
+data = LOAD 'blog-recommendation/trainPostsFinal' USING JsonLoader('date_gmt:chararray, language:chararray, author:chararray, url:chararray, title:chararray, blog:chararray, post_id:chararray, tags:{T:(tag_name:chararray)}, blogname:chararray, date:chararray, content:chararray, categories:{T:(category_name:chararray)}, likes:{T:(dt:chararray, uid:chararray)}');
+
+data_for_feed = FOREACH data GENERATE
+ date_gmt,
+ language,
+ author,
+ url,
+ title,
+ blog,
+ post_id,
+ tags,
+ blogname,
+ content,
+ categories;
+
+data_doc = LOAD 'blog-recommendation/user_item_cf/product_features' USING JsonLoader('post_id:chararray, user_item_cf:[double]');
+
+data_content_and_doc_tensor = JOIN data_for_feed BY post_id LEFT, data_doc BY post_id;
+data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE
+ date_gmt AS date_gmt,
+ language AS language,
+ author AS author,
+ url AS url,
+ title AS title,
+ blog AS blog,
+ data_for_feed::post_id as post_id,
+ tags AS tags,
+ blogname AS blogname,
+ content AS content,
+ categories AS categories,
+ user_item_cf AS user_item_cf,
+ (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf;
+
+data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*);
+
+-- use cf latent factor
+data_user = LOAD 'blog-recommendation/user_item_cf/user_features' USING JsonLoader('user_id:chararray, user_item_cf:[double]');
+data_user = FOREACH data_user GENERATE
+ user_id AS user_id,
+ user_item_cf AS user_item_cf;
+
+
+-- Articles already liked
+data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid);
+
+post_liked_per_user = GROUP data_likes BY uid;
+post_liked_per_user = FOREACH post_liked_per_user GENERATE group AS user_id, data_likes.post_id AS has_read_items;
+
+-- Join user data
+data_user = JOIN post_liked_per_user BY user_id FULL, data_user BY user_id;
+
+data_user = FOREACH data_user GENERATE (post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id,
+ user_item_cf AS user_item_cf,
+ (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf,
+ has_read_items AS has_read_items;
+data_user = FILTER data_user BY user_id IS NOT NULL;
+
+data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*);
+
+joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed;
+
+-- STORE data_for_feed into 'vespa_put_operation';
+
+-- Store into Vespa
+STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage();
+
+
+
+
diff --git a/sample-apps/blog-recommendation/src/pig/feed_content_vespa.pig b/sample-apps/blog-recommendation/src/pig/feed_content_vespa.pig
new file mode 100644
index 00000000000..59b173e16f4
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/feed_content_vespa.pig
@@ -0,0 +1,71 @@
+REGISTER vespa-hadoop.jar
+
+-- UDF to create valid Vespa document operation in JSON format
+DEFINE VespaPutOperationDoc
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-search:blog_post::<post_id>',
+ 'simple-array-fields=tags,categories'
+ );
+
+-- UDF to send data to a Vespa endpoint
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data from any source - here we load using JsonLoader
+data_first_release = LOAD 'blog-recommendation/first_release/trainPosts.json' USING
+ JsonLoader('date_gmt:chararray,
+ language:chararray,
+ author:chararray,
+ url:chararray,
+ title:chararray,
+ blog:chararray,
+ post_id:chararray,
+ tags:{T:(tag_name:chararray)},
+ blogname:chararray,
+ date:chararray,
+ content:chararray,
+ categories:{T:(category_name:chararray)},
+ likes:{T:(dt:chararray, uid:chararray)}');
+
+data_second_release = LOAD 'blog-recommendation/second_release/trainPosts.json' USING
+ JsonLoader('date_gmt:chararray,
+ language:chararray,
+ author:chararray,
+ url:chararray,
+ title:chararray,
+ blog:chararray,
+ post_id:chararray,
+ tags:{T:(tag_name:chararray)},
+ blogname:chararray,
+ date:chararray,
+ content:chararray,
+ categories:{T:(category_name:chararray)},
+ likes:{T:(dt:chararray, uid:chararray)}');
+
+data = UNION data_first_release, data_second_release;
+
+-- Select fields that will be sent to Vespa.
+-- This should follow blog_post.sd
+data_for_feed = FOREACH data GENERATE
+ date_gmt,
+ language,
+ author,
+ url,
+ title,
+ blog,
+ post_id,
+ tags,
+ blogname,
+ content,
+ categories;
+
+-- Create valid Vespa put operations in JSON format
+data_for_feed_json = FOREACH data_for_feed GENERATE VespaPutOperationDoc(*);
+
+-- Sample Vespa operations
+-- data_for_feed_json_sample = SAMPLE data_for_feed_json 0.0005;
+-- STORE data_for_feed_json_sample INTO 'blog-sample';
+
+-- Store into Vespa
+STORE data_for_feed_json INTO '$ENDPOINT' USING VespaStorage(); \ No newline at end of file
diff --git a/sample-apps/blog-recommendation/src/pig/feed_user_item_cf_vespa.pig b/sample-apps/blog-recommendation/src/pig/feed_user_item_cf_vespa.pig
new file mode 100644
index 00000000000..fd06394c3af
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/feed_user_item_cf_vespa.pig
@@ -0,0 +1,37 @@
+REGISTER vespa-hadoop.jar
+
+-- Create valid Vespa put operations
+DEFINE VespaPutOperationDoc
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-recommendation:blog_post::<post_id>',
+ 'create-tensor-fields=user_item_cf'
+ );
+
+DEFINE VespaPutOperationUser
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-recommendation:user::<user_id>',
+ 'create-tensor-fields=user_item_cf'
+ );
+
+-- Transform tabular data to a Vespa document operation JSON format
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+
+data_doc = LOAD 'blog-recommendation/user_item_cf/product_features' USING JsonLoader('post_id:chararray, user_item_cf:[double]');
+data_doc_for_feed = FOREACH data_doc GENERATE VespaPutOperationDoc(*);
+
+
+data_user = LOAD 'blog-recommendation/user_item_cf/user_features' USING JsonLoader('user_id:chararray, user_item_cf:[double]');
+data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*);
+
+
+-- Store into Vespa
+STORE data_doc_for_feed INTO '$ENDPOINT' USING VespaStorage();
+STORE data_user_for_feed INTO '$ENDPOINT' USING VespaStorage();
+
+
+
+
diff --git a/sample-apps/blog-recommendation/src/pig/generate_user_item_cf_dataset.pig b/sample-apps/blog-recommendation/src/pig/generate_user_item_cf_dataset.pig
new file mode 100644
index 00000000000..2e71dcbe9e3
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/generate_user_item_cf_dataset.pig
@@ -0,0 +1,15 @@
+-- Load data from any source - here we load using PigStorage
+data = LOAD 'blog-recommendation/trainPostsFinal' USING JsonLoader('date_gmt:chararray, language:chararray, author:chararray, url:chararray, title:chararray, blog:chararray, post_id:chararray, tags:{T:(tag_name:chararray)}, blogname:chararray, date:chararray, content:chararray, categories:{T:(category_name:chararray)}, likes:{T:(dt:chararray, uid:chararray)}');
+
+data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid);
+
+data_cf = FOREACH data_likes GENERATE uid, post_id, 1 as rate;
+
+data_cf = FILTER data_cf BY (uid IS NOT NULL) AND (uid != '') AND (post_id IS NOT NULL) AND (post_id != '');
+
+-- data_cf_sample = SAMPLE data_cf 0.001;
+
+-- data_cf = LIMIT data_cf 10;
+
+STORE data_cf INTO 'blog-recommendation/trainPostsFinal_user_item_cf';
+
diff --git a/sample-apps/blog-recommendation/src/pig/get_recommendations.pig b/sample-apps/blog-recommendation/src/pig/get_recommendations.pig
new file mode 100644
index 00000000000..00b03b0f49a
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/get_recommendations.pig
@@ -0,0 +1,29 @@
+-- REGISTER $VESPA_HADOOP_JAR
+REGISTER vespa-hadoop.jar
+-- REGISTER parquet-pig-bundle-1.8.1.jar
+
+-- Define Vespa query for retrieving blog posts
+DEFINE BlogPostRecommendations
+ com.yahoo.vespa.hadoop.pig.VespaQuery(
+ 'query=http://ENDPOINT:8080/search/?user_id=<userid>&hits=100',
+ 'schema=rank:int,id:chararray,relevance:double,fields/post_id:chararray'
+ );
+
+-- Load test_set data from a local file
+test_set = LOAD 'data/cv/test_set_exploded' AS (post_id:chararray, userid:chararray);
+users = FOREACH test_set GENERATE userid;
+users = FILTER users BY userid IS NOT null;
+users = DISTINCT users;
+
+users_limit = LIMIT users 10;
+
+-- Run a set of queries against Vespa
+recommendations = FOREACH users_limit GENERATE userid,
+ FLATTEN(BlogPostRecommendations(*)) AS (rank, id, relevance, post_id);
+recommendations = FOREACH recommendations GENERATE userid, rank, post_id;
+
+recommendations = FILTER recommendations BY rank IS NOT NULL AND post_id IS NOT NULL;
+
+-- Output recommendations
+STORE recommendations INTO 'data/recommendations' USING PigStorage('\t', '-schema');
+-- STORE recommendations INTO 'data/recommendations' USING org.apache.parquet.pig.ParquetStorer();
diff --git a/sample-apps/blog-recommendation/src/pig/tutorial_blog_popularity.pig b/sample-apps/blog-recommendation/src/pig/tutorial_blog_popularity.pig
new file mode 100644
index 00000000000..4dac36a717f
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/tutorial_blog_popularity.pig
@@ -0,0 +1,55 @@
+REGISTER '$VESPA_HADOOP_JAR'
+
+-- UDF to create valid Vespa document operation in JSON format
+DEFINE VespaUpdateOperationDoc
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=update',
+ 'docid=id:blog-search:blog_post::<post_id>'
+ );
+
+-- UDF to send data to a Vespa endpoint
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data from any source - here we load using JsonLoader
+data = LOAD '$DATA_PATH' USING
+ JsonLoader('date_gmt:chararray,
+ language:chararray,
+ author:chararray,
+ url:chararray,
+ title:chararray,
+ blog:chararray,
+ post_id:chararray,
+ tags:{T:(tag_name:chararray)},
+ blogname:chararray,
+ date:chararray,
+ content:chararray,
+ categories:{T:(category_name:chararray)},
+ likes:{T:(dt:chararray, uid:chararray)}');
+
+data = FILTER data BY likes IS NOT NULL;
+
+data_likes = FOREACH data GENERATE
+ blog,
+ post_id,
+ blogname,
+ FLATTEN(likes) AS (dt, uid);
+
+-- data_likes_limit = LIMIT data_likes 10;
+
+likes = FOREACH (GROUP data_likes ALL)
+ GENERATE COUNT(data_likes) as total_number;
+
+blog_popularity = FOREACH (GROUP data_likes BY blog) GENERATE
+ group as blog,
+ (double)COUNT(data_likes)/(double)likes.total_number AS popularity;
+
+data_update = JOIN data_likes BY blog, blog_popularity BY blog;
+data_update = FOREACH data_update GENERATE
+ post_id, popularity;
+
+-- Create valid Vespa put operations in JSON format
+data_for_feed_json = FOREACH data_update GENERATE VespaUpdateOperationDoc(*);
+
+-- Store into Vespa
+STORE data_for_feed_json INTO '$ENDPOINT' USING VespaStorage();
diff --git a/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig
new file mode 100644
index 00000000000..77943fd842a
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig
@@ -0,0 +1,116 @@
+REGISTER '$VESPA_HADOOP_JAR'
+
+-- Create valid Vespa put operations
+DEFINE VespaPutOperationDoc
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-recommendation:blog_post::<post_id>',
+ 'create-tensor-fields=user_item_cf',
+ 'simple-array-fields=tags,categories'
+ );
+
+DEFINE VespaPutOperationUser
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-recommendation:user::<user_id>',
+ 'create-tensor-fields=user_item_cf',
+ 'simple-array-fields=has_read_items'
+ );
+
+-- Transform tabular data to a Vespa document operation JSON format
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data
+data = LOAD '$DATA_PATH' USING
+ JsonLoader('date_gmt:chararray,
+ language:chararray,
+ author:chararray,
+ url:chararray,
+ title:chararray,
+ blog:chararray,
+ post_id:chararray,
+ tags:{T:(tag_name:chararray)},
+ blogname:chararray,
+ date:chararray,
+ content:chararray,
+ categories:{T:(category_name:chararray)},
+ likes:{T:(dt:chararray, uid:chararray)}');
+
+data_for_feed = FOREACH data GENERATE
+ date_gmt,
+ language,
+ author,
+ url,
+ title,
+ blog,
+ post_id,
+ tags,
+ blogname,
+ content,
+ categories;
+
+-- Load Blog post CF latent factors
+data_doc = LOAD '$BLOG_POST_FACTORS' USING
+ JsonLoader('post_id:chararray,
+ user_item_cf:[double]');
+
+-- Join data and latent factors
+data_content_and_doc_tensor = JOIN data_for_feed BY post_id LEFT, data_doc BY post_id;
+data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE
+ date_gmt AS date_gmt,
+ language AS language,
+ author AS author,
+ url AS url,
+ title AS title,
+ blog AS blog,
+ data_for_feed::post_id as post_id,
+ tags AS tags,
+ blogname AS blogname,
+ content AS content,
+ categories AS categories,
+ user_item_cf AS user_item_cf,
+ (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf;
+
+-- Generate valid Vespa JSON format
+data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*);
+
+-- Load User CF latent factors
+data_user = LOAD '$USER_FACTORS' USING
+ JsonLoader('user_id:chararray,
+ user_item_cf:[double]');
+data_user = FOREACH data_user GENERATE
+ user_id AS user_id,
+ user_item_cf AS user_item_cf;
+
+-- Articles already liked
+data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid);
+
+post_liked_per_user = GROUP data_likes BY uid;
+post_liked_per_user = FOREACH post_liked_per_user GENERATE
+ group AS user_id,
+ data_likes.post_id AS has_read_items;
+
+-- Join user data
+data_user = JOIN post_liked_per_user BY user_id FULL,
+ data_user BY user_id;
+
+data_user = FOREACH data_user GENERATE
+ (post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id,
+ user_item_cf AS user_item_cf,
+ (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf,
+ has_read_items AS has_read_items;
+
+data_user = FILTER data_user BY user_id IS NOT NULL;
+
+-- Generate valid Vespa JSON format
+data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*);
+
+joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed;
+
+-- Store into Vespa
+STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage();
+
+
+
+
diff --git a/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_vespa.pig b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_vespa.pig
new file mode 100644
index 00000000000..d20ccf505a9
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_vespa.pig
@@ -0,0 +1,51 @@
+REGISTER '$VESPA_HADOOP_JAR'
+-- REGISTER vespa-hadoop.jar
+
+-- UDF to create valid Vespa document operation in JSON format
+DEFINE VespaPutOperationDoc
+ com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
+ 'operation=put',
+ 'docid=id:blog-search:blog_post::<post_id>',
+ 'simple-array-fields=tags,categories'
+ );
+
+-- UDF to send data to a Vespa endpoint
+DEFINE VespaStorage
+ com.yahoo.vespa.hadoop.pig.VespaStorage();
+
+-- Load data from any source - here we load using JsonLoader
+data = LOAD '$DATA_PATH' USING
+ JsonLoader('date_gmt:chararray,
+ language:chararray,
+ author:chararray,
+ url:chararray,
+ title:chararray,
+ blog:chararray,
+ post_id:chararray,
+ tags:{T:(tag_name:chararray)},
+ blogname:chararray,
+ date:chararray,
+ content:chararray,
+ categories:{T:(category_name:chararray)},
+ likes:{T:(dt:chararray, uid:chararray)}');
+
+-- Select fields that will be sent to Vespa.
+-- This should follow blog_post.sd
+data_for_feed = FOREACH data GENERATE
+ date_gmt,
+ language,
+ author,
+ url,
+ title,
+ blog,
+ post_id,
+ tags,
+ blogname,
+ content,
+ categories;
+
+-- Create valid Vespa put operations in JSON format
+data_for_feed_json = FOREACH data_for_feed GENERATE VespaPutOperationDoc(*);
+
+-- Store into Vespa
+STORE data_for_feed_json INTO '$ENDPOINT' USING VespaStorage();
diff --git a/sample-apps/blog-recommendation/src/spark/collaborative_filtering_example.scala b/sample-apps/blog-recommendation/src/spark/collaborative_filtering_example.scala
new file mode 100644
index 00000000000..1a2c8f92730
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/spark/collaborative_filtering_example.scala
@@ -0,0 +1,59 @@
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
+import org.apache.spark.mllib.recommendation.Rating
+import scala.util.parsing.json.JSONObject
+
+// Load and parse the data
+val data = sc.textFile("blog-recommendation/trainPostsFinal_user_item_cf")
+val ratings = data.map(_.split('\t') match { case Array(user, item, rate) =>
+ Rating(user.toInt, item.toInt, rate.toDouble)
+})
+
+// Build the recommendation model using ALS
+val rank = 10
+val numIterations = 10
+val model = ALS.train(ratings, rank, numIterations, 0.01)
+
+// Evaluate the model on rating data
+val usersProducts = ratings.map { case Rating(user, product, rate) =>
+ (user, product)
+}
+val predictions =
+ model.predict(usersProducts).map { case Rating(user, product, rate) =>
+ ((user, product), rate)
+ }
+val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
+ ((user, product), rate)
+}.join(predictions)
+val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
+ val err = (r1 - r2)
+ err * err
+}.mean()
+println("Mean Squared Error = " + MSE)
+
+def writeModelFeaturesAsTensor (modelFeatures:(Int, Array[Double]), id_string:String) = {
+
+ val id = modelFeatures._1
+ val latentVector = modelFeatures._2
+ var latentVectorMap:Map[String,Double] = Map()
+ var output:Map[String,Any] = Map()
+
+ for ( i <- 0 until latentVector.length ){
+
+ latentVectorMap += (("user_item_cf:" + i.toString, latentVector(i)))
+
+ }
+
+ output += ((id_string, id))
+ output += (("user_item_cf", scala.util.parsing.json.JSONObject(latentVectorMap)))
+
+ JSONObject(output)
+
+}
+
+val product_features = model.productFeatures.map(x => writeModelFeaturesAsTensor(x, "post_id"))
+product_features.saveAsTextFile("blog-recommendation/user_item_cf/product_features")
+val user_features = model.userFeatures.map(x => writeModelFeaturesAsTensor(x, "user_id"))
+user_features.saveAsTextFile("blog-recommendation/user_item_cf/user_features")
+
+
diff --git a/sample-apps/blog-recommendation/src/spark/data_exploration.scala b/sample-apps/blog-recommendation/src/spark/data_exploration.scala
new file mode 100644
index 00000000000..228834cfb4b
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/spark/data_exploration.scala
@@ -0,0 +1,63 @@
+// sc is an existing SparkContext.
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+val original_train_post_path = "blog-recommendation-support/data/original_data/trainPosts.json"
+val original_train_post_thin_path = "blog-recommendation-support/data/original_data/trainPostsThin.json"
+val original_test_post_thin_path = "blog-recommendation-support/data/original_data/testPostsThin.json"
+
+val original_train_post = sqlContext.read.json(original_train_post_path)
+val original_train_post_thin = sqlContext.read.json(original_train_post_thin_path)
+val original_test_post_thin = sqlContext.read.json(original_test_post_thin_path)
+
+val count_original_train = original_train_post.count()
+val count_original_train_thin = original_train_post_thin.count()
+val count_original_test_thin = original_test_post_thin.count()
+
+// The inferred schema can be visualized using the printSchema() method.
+original_train_post.printSchema()
+original_train_post_thin.printSchema()
+original_test_post_thin.printSchema()
+
+// No intersection between train and test data
+original_train_post_thin.join(original_test_post_thin, original_train_post_thin("post_id") == original_test_post_thin("post_id")).count(2)
+
+// original_train_minimal_df
+var original_train_minimal_df = original_train_post.select($"date_gmt", $"post_id", size($"likes").as("number_likes"), $"likes")
+// no duplicate post_id
+original_train_minimal_df.select("post_id").dropDuplicates().count() - original_train_minimal_df.select("post_id").count()
+
+// CHECK THIS DECISION - I SHOULD NOT EXLUDE POST_ID WITH ZERO LIKES
+// OTHERWISE THERE WILL BE NO DOCUMENT IN THE TEST SET THAT NO ONE HAS LIKED,
+// WHICH MAKES THE EXERCISE MUCH EASIER
+// only post_id with at least one like
+// original_train_minimal_df = original_train_minimal_df.filter("number_likes > 0")
+
+// Set some post_id aside to be present only on the test set
+var sets = original_train_minimal_df.randomSplit(Array(0.95, 0.05), 123)
+
+var training_set = sets(0)
+var test_set = sets(1)
+
+// flat dataframe so that each line is a combination of post_id and user
+training_set = training_set.select($"post_id", explode($"likes").as("likes_flat"))
+training_set = training_set.select("post_id", "likes_flat.uid")
+
+test_set = test_set.select($"post_id", explode($"likes").as("likes_flat"))
+test_set = test_set.select("post_id", "likes_flat.uid")
+
+// randomly move some (post_id, uid) from training set to test set
+sets = training_set.randomSplit(Array(0.85, 0.15), 123)
+
+training_set = sets(0)
+var additional_test_set = sets(1)
+
+// concatenate test_set and additional_test_set
+test_set = test_set.unionAll(additional_test_set)
+
+// see number of likes distribution
+val like_dist = original_train_minimal_df.groupBy("number_likes").count().orderBy(asc("number_likes")).collect()
+like_dist.map(println)
+
+
+
+
diff --git a/sample-apps/blog-recommendation/src/spark/expected_percentile.scala b/sample-apps/blog-recommendation/src/spark/expected_percentile.scala
new file mode 100644
index 00000000000..986a3eb79f4
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/spark/expected_percentile.scala
@@ -0,0 +1,39 @@
+val test_file_path = "data/cv/test_set_exploded"
+val blog_recom_file_path = "data/recommendations"
+val size_recommendation_list = 100
+
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+val test_set = sc.textFile(test_file_path).
+ map(_.split("\t")).map(p => (p(0).toString, p(1).toString)).
+ toDF("post_id", "user_id")
+
+val recommendations = sc.textFile(blog_recom_file_path).
+ map(_.split("\t")).map(p => (p(0).toString, p(1).toString, p(2).toString)).
+ toDF("user_id", "rank", "post_id")
+
+// val recommendations = sqlContext.createDataFrame(Seq(
+// ("16966742", "5", "1009088"),
+// ("30463255", "10", "1044974")
+// )).toDF("user_id", "rank", "post_id")
+
+// join data
+var joined_data = test_set.
+ join(recommendations,
+ test_set("post_id") === recommendations("post_id") &&
+ test_set("user_id") === recommendations("user_id")).
+ select(test_set("post_id"),
+ test_set("user_id"),
+ recommendations("rank"))
+
+// transform and add a column
+joined_data = joined_data.withColumn("percentile", joined_data("rank")/size_recommendation_list)
+
+val expected_percentile = joined_data.
+ // groupBy($"user_id").
+ groupBy().
+ agg(sum($"percentile").as("sum_percentile"),
+ count($"post_id").as("number_read")).
+ withColumn("expected_percentile", $"sum_percentile" / $"number_read")
+
+expected_percentile.show() \ No newline at end of file
diff --git a/sample-apps/blog-recommendation/src/spark/full_dataset_cf.scala b/sample-apps/blog-recommendation/src/spark/full_dataset_cf.scala
new file mode 100644
index 00000000000..0b76e8b8b1c
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/spark/full_dataset_cf.scala
@@ -0,0 +1,60 @@
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
+import org.apache.spark.mllib.recommendation.Rating
+import scala.util.parsing.json.JSONObject
+
+// Prepare data
+
+val data_path = "data/original_data/trainPosts.json"
+
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+val full_dataset = sqlContext.read.json(data_path)
+
+var data = full_dataset.select($"post_id", explode($"likes").as("likes_flat"))
+data = data.select($"likes_flat.uid".as("uid"), $"post_id")
+
+data = data.filter("uid is not null and uid != '' and post_id is not null and post_id != ''")
+
+val ratings = data.rdd.map(x => (x(0).toString, x(1).toString) match {
+ case (user, item) => Rating(user.toInt, item.toInt, 1)
+})
+
+// Train the model
+
+val rank = 10
+val numIterations = 10
+val model = ALS.train(ratings, rank, numIterations, 0.01)
+
+// Convert latent vectors from model to Vespa Tensor model
+
+def writeModelFeaturesAsTensor (modelFeatures:(Int, Array[Double]), id_string:String) = {
+
+ val id = modelFeatures._1
+ val latentVector = modelFeatures._2
+ var latentVectorMap:Map[String,Double] = Map()
+ var output:Map[String,Any] = Map()
+
+ for ( i <- 0 until latentVector.length ){
+
+ latentVectorMap += (("user_item_cf:" + i.toString, latentVector(i)))
+
+ }
+
+ output += ((id_string, id))
+ output += (("user_item_cf", scala.util.parsing.json.JSONObject(latentVectorMap)))
+
+ JSONObject(output)
+
+}
+
+// Write user and item latent factors to disk
+
+val product_features = model.productFeatures.map(x => writeModelFeaturesAsTensor(x, "post_id"))
+product_features.saveAsTextFile("data/user_item_cf/product_features")
+val user_features = model.userFeatures.map(x => writeModelFeaturesAsTensor(x, "user_id"))
+user_features.saveAsTextFile("data/user_item_cf/user_features")
+
+
+
+
diff --git a/sample-apps/blog-recommendation/src/spark/train_test_set_division.scala b/sample-apps/blog-recommendation/src/spark/train_test_set_division.scala
new file mode 100644
index 00000000000..2fc67734386
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/spark/train_test_set_division.scala
@@ -0,0 +1,45 @@
+import org.apache.spark.sql.functions.udf
+
+// Inputs
+val input_file_path = "data/original_data/trainPosts.json"
+val test_perc_stage1 = 0.05
+val test_perc_stage2 = 0.15
+val training_file_path = "data/cv/training_set_exploded"
+val test_file_path = "data/cv/test_set_exploded"
+val seed = 123
+
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+// Load full dataset
+val full_dataset = sqlContext.read.json(input_file_path)
+val full_dataset_simple = full_dataset.select($"post_id", size($"likes").as("number_likes"), $"likes")
+
+// Set some blog posts aside to be present only on the test set
+var sets = full_dataset_simple.randomSplit(Array(1 - test_perc_stage1, test_perc_stage1), seed)
+
+var training_set = sets(0)
+val training_set_null = training_set.filter("number_likes == 0")
+var training_set_exploded = training_set.select($"post_id", explode($"likes").as("likes_flat"))
+training_set_exploded = training_set_exploded.select("post_id", "likes_flat.uid")
+
+var test_set = sets(1)
+val test_set_null = test_set.filter("number_likes == 0")
+var test_set_exploded = test_set.select($"post_id", explode($"likes").as("likes_flat"))
+test_set_exploded = test_set_exploded.select("post_id", "likes_flat.uid")
+
+// randomly move some (post_id, uid) from training set to test set
+sets = training_set_exploded.randomSplit(Array(1 - test_perc_stage2, test_perc_stage2), seed)
+
+training_set_exploded = sets(0)
+
+var additional_test_set_exploded = sets(1)
+test_set_exploded = test_set_exploded.unionAll(additional_test_set_exploded)
+
+// concatenate exploded set with null set
+val getNull = udf(() => None: Option[String])
+training_set_exploded = training_set_exploded.unionAll(training_set_null.select("post_id").withColumn("uid", getNull()))
+test_set_exploded = test_set_exploded.unionAll(test_set_null.select("post_id").withColumn("uid", getNull()))
+
+// Write to disk
+training_set_exploded.rdd.map(x => x(0) + "\t" + x(1)).saveAsTextFile(training_file_path)
+test_set_exploded.rdd.map(x => x(0) + "\t" + x(1)).saveAsTextFile(test_file_path) \ No newline at end of file