1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
REGISTER '$VESPA_HADOOP_JAR'
-- Create valid Vespa put operations
DEFINE VespaPutOperationDoc
com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
'operation=put',
'docid=id:blog-recommendation:blog_post::<post_id>',
'create-tensor-fields=user_item_cf',
'simple-array-fields=tags,categories'
);
DEFINE VespaPutOperationUser
com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
'operation=put',
'docid=id:blog-recommendation:user::<user_id>',
'create-tensor-fields=user_item_cf',
'simple-array-fields=has_read_items'
);
-- Transform tabular data to a Vespa document operation JSON format
DEFINE VespaStorage
com.yahoo.vespa.hadoop.pig.VespaStorage();
-- Load data
data = LOAD '$DATA_PATH' USING
JsonLoader('date_gmt:chararray,
language:chararray,
author:chararray,
url:chararray,
title:chararray,
blog:chararray,
post_id:chararray,
tags:{T:(tag_name:chararray)},
blogname:chararray,
date:chararray,
content:chararray,
categories:{T:(category_name:chararray)},
likes:{T:(dt:chararray, uid:chararray)}');
data_for_feed = FOREACH data GENERATE
date_gmt,
language,
author,
url,
title,
blog,
post_id,
tags,
blogname,
content,
categories;
-- Load Blog post CF latent factors
data_doc = LOAD '$BLOG_POST_FACTORS' USING
JsonLoader('post_id:chararray,
user_item_cf:[double]');
-- Join data and latent factors
data_content_and_doc_tensor = JOIN data_for_feed BY post_id LEFT, data_doc BY post_id;
data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE
date_gmt AS date_gmt,
language AS language,
author AS author,
url AS url,
title AS title,
blog AS blog,
data_for_feed::post_id as post_id,
tags AS tags,
blogname AS blogname,
content AS content,
categories AS categories,
user_item_cf AS user_item_cf,
(user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf;
-- Generate valid Vespa JSON format
data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*);
-- Load User CF latent factors
data_user = LOAD '$USER_FACTORS' USING
JsonLoader('user_id:chararray,
user_item_cf:[double]');
data_user = FOREACH data_user GENERATE
user_id AS user_id,
user_item_cf AS user_item_cf;
-- Articles already liked
data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid);
post_liked_per_user = GROUP data_likes BY uid;
post_liked_per_user = FOREACH post_liked_per_user GENERATE
group AS user_id,
data_likes.post_id AS has_read_items;
-- Join user data
data_user = JOIN post_liked_per_user BY user_id FULL,
data_user BY user_id;
data_user = FOREACH data_user GENERATE
(post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id,
user_item_cf AS user_item_cf,
(user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf,
has_read_items AS has_read_items;
data_user = FILTER data_user BY user_id IS NOT NULL;
-- Generate valid Vespa JSON format
data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*);
joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed;
-- Store into Vespa
STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage();
|