summaryrefslogtreecommitdiffstats
path: root/sample-apps/blog-recommendation/src/pig/tutorial_feed_content_and_tensor_vespa.pig
blob: 77943fd842ad76ffc9478eba82edc93f9e556caf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
REGISTER '$VESPA_HADOOP_JAR'

-- Create valid Vespa put operations
DEFINE VespaPutOperationDoc
       com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
            'operation=put',
            'docid=id:blog-recommendation:blog_post::<post_id>',
            'create-tensor-fields=user_item_cf',
            'simple-array-fields=tags,categories'
       );

DEFINE VespaPutOperationUser
       com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
            'operation=put',
            'docid=id:blog-recommendation:user::<user_id>',
            'create-tensor-fields=user_item_cf',
            'simple-array-fields=has_read_items'
       );

-- Transform tabular data to a Vespa document operation JSON format
DEFINE VespaStorage
       com.yahoo.vespa.hadoop.pig.VespaStorage();

-- Load data 
data = LOAD '$DATA_PATH' USING 
  JsonLoader('date_gmt:chararray, 
              language:chararray, 
              author:chararray, 
              url:chararray, 
              title:chararray, 
              blog:chararray, 
              post_id:chararray, 
              tags:{T:(tag_name:chararray)}, 
              blogname:chararray, 
              date:chararray, 
              content:chararray, 
              categories:{T:(category_name:chararray)}, 
              likes:{T:(dt:chararray, uid:chararray)}');

data_for_feed = FOREACH data GENERATE 
  date_gmt, 
  language, 
  author, 
  url, 
  title, 
  blog, 
  post_id, 
  tags, 
  blogname,
  content, 
  categories;

-- Load Blog post CF latent factors
data_doc = LOAD '$BLOG_POST_FACTORS' USING 
  JsonLoader('post_id:chararray, 
              user_item_cf:[double]');

-- Join data and latent factors
data_content_and_doc_tensor = JOIN data_for_feed BY post_id LEFT, data_doc BY post_id;
data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE
  date_gmt AS date_gmt, 
  language AS language, 
  author AS author, 
  url AS url, 
  title AS title, 
  blog AS blog, 
  data_for_feed::post_id as post_id, 
  tags AS tags, 
  blogname AS blogname,
  content AS content, 
  categories AS categories,
  user_item_cf AS user_item_cf,
  (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf;

-- Generate valid Vespa JSON format
data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*);

-- Load User CF latent factors
data_user = LOAD '$USER_FACTORS' USING 
  JsonLoader('user_id:chararray, 
              user_item_cf:[double]');
data_user = FOREACH data_user GENERATE  
  user_id AS user_id,
  user_item_cf AS user_item_cf;
  
-- Articles already liked
data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid); 

post_liked_per_user = GROUP data_likes BY uid;
post_liked_per_user = FOREACH post_liked_per_user GENERATE 
  group AS user_id, 
  data_likes.post_id AS has_read_items;

-- Join user data
data_user = JOIN post_liked_per_user BY user_id FULL, 
                 data_user BY user_id;

data_user = FOREACH data_user GENERATE 
  (post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id,
  user_item_cf AS user_item_cf,
  (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf,
  has_read_items AS has_read_items;

data_user = FILTER data_user BY user_id IS NOT NULL;

-- Generate valid Vespa JSON format
data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*);

joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed;

-- Store into Vespa
STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage();