summaryrefslogtreecommitdiffstats
path: root/sample-apps/blog-recommendation/src/spark/expected_percentile.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sample-apps/blog-recommendation/src/spark/expected_percentile.scala')
-rw-r--r--sample-apps/blog-recommendation/src/spark/expected_percentile.scala39
1 files changed, 39 insertions, 0 deletions
diff --git a/sample-apps/blog-recommendation/src/spark/expected_percentile.scala b/sample-apps/blog-recommendation/src/spark/expected_percentile.scala
new file mode 100644
index 00000000000..986a3eb79f4
--- /dev/null
+++ b/sample-apps/blog-recommendation/src/spark/expected_percentile.scala
@@ -0,0 +1,39 @@
+val test_file_path = "data/cv/test_set_exploded"
+val blog_recom_file_path = "data/recommendations"
+val size_recommendation_list = 100
+
+val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+
+val test_set = sc.textFile(test_file_path).
+ map(_.split("\t")).map(p => (p(0).toString, p(1).toString)).
+ toDF("post_id", "user_id")
+
+val recommendations = sc.textFile(blog_recom_file_path).
+ map(_.split("\t")).map(p => (p(0).toString, p(1).toString, p(2).toString)).
+ toDF("user_id", "rank", "post_id")
+
+// val recommendations = sqlContext.createDataFrame(Seq(
+// ("16966742", "5", "1009088"),
+// ("30463255", "10", "1044974")
+// )).toDF("user_id", "rank", "post_id")
+
+// join data
+var joined_data = test_set.
+ join(recommendations,
+ test_set("post_id") === recommendations("post_id") &&
+ test_set("user_id") === recommendations("user_id")).
+ select(test_set("post_id"),
+ test_set("user_id"),
+ recommendations("rank"))
+
+// transform and add a column
+joined_data = joined_data.withColumn("percentile", joined_data("rank")/size_recommendation_list)
+
+val expected_percentile = joined_data.
+ // groupBy($"user_id").
+ groupBy().
+ agg(sum($"percentile").as("sum_percentile"),
+ count($"post_id").as("number_read")).
+ withColumn("expected_percentile", $"sum_percentile" / $"number_read")
+
+expected_percentile.show() \ No newline at end of file