summaryrefslogtreecommitdiffstats
path: root/sample-apps/blog-recommendation/src/spark/expected_percentile.scala
blob: 986a3eb79f456a641ff568396b48403e67cdf35b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
val test_file_path = "data/cv/test_set_exploded"
val blog_recom_file_path = "data/recommendations"
val size_recommendation_list = 100

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val test_set = sc.textFile(test_file_path).
  map(_.split("\t")).map(p => (p(0).toString, p(1).toString)).
  toDF("post_id", "user_id")

val recommendations = sc.textFile(blog_recom_file_path).
  map(_.split("\t")).map(p => (p(0).toString, p(1).toString, p(2).toString)).
  toDF("user_id", "rank", "post_id")

// val recommendations = sqlContext.createDataFrame(Seq(
//   ("16966742", "5", "1009088"),
//   ("30463255", "10", "1044974")
// )).toDF("user_id", "rank", "post_id")

// join data
var joined_data = test_set.
  join(recommendations, 
       test_set("post_id") === recommendations("post_id") && 
        test_set("user_id") === recommendations("user_id")).
  select(test_set("post_id"), 
         test_set("user_id"), 
         recommendations("rank"))

// transform and add a column
joined_data = joined_data.withColumn("percentile", joined_data("rank")/size_recommendation_list)

val expected_percentile = joined_data.
  // groupBy($"user_id").
  groupBy().
  agg(sum($"percentile").as("sum_percentile"), 
      count($"post_id").as("number_read")).
  withColumn("expected_percentile", $"sum_percentile" / $"number_read")

expected_percentile.show()