summaryrefslogtreecommitdiffstats
path: root/sample-apps/blog-recommendation/src/pig/feed_content_and_tensor_vespa.pig
blob: 9a536f38779e86ed669eea57f7daf9e23565c81d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
REGISTER vespa-hadoop.jar

-- Create valid Vespa put operations
DEFINE VespaPutOperationDoc
       com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
            'operation=put',
            'docid=id:blog-recommendation:blog_post::<post_id>',
            'create-tensor-fields=user_item_cf',
            'simple-array-fields=tags,categories'
       );

DEFINE VespaPutOperationUser
       com.yahoo.vespa.hadoop.pig.VespaDocumentOperation(
            'operation=put',
            'docid=id:blog-recommendation:user::<user_id>',
            'create-tensor-fields=user_item_cf',
            'simple-array-fields=has_read_items'
       );

-- Transform tabular data to a Vespa document operation JSON format
DEFINE VespaStorage
       com.yahoo.vespa.hadoop.pig.VespaStorage();

-- DEFINE VespaStorage
--        com.yahoo.vespa.hadoop.pig.VespaStorage(
--             'create-document-operation=true',
--             'operation=put',
--             'docid=id:blog-recommendation:blog_post::<post_id>'
--        );

-- Load data from any source - here we load using PigStorage
data = LOAD 'blog-recommendation/trainPostsFinal' USING JsonLoader('date_gmt:chararray, language:chararray, author:chararray, url:chararray, title:chararray, blog:chararray, post_id:chararray, tags:{T:(tag_name:chararray)}, blogname:chararray, date:chararray, content:chararray, categories:{T:(category_name:chararray)}, likes:{T:(dt:chararray, uid:chararray)}');

data_for_feed = FOREACH data GENERATE 
  date_gmt, 
  language, 
  author, 
  url, 
  title, 
  blog, 
  post_id, 
  tags, 
  blogname,
  content, 
  categories;

data_doc = LOAD 'blog-recommendation/user_item_cf/product_features' USING JsonLoader('post_id:chararray, user_item_cf:[double]');

data_content_and_doc_tensor = JOIN data_for_feed BY post_id LEFT, data_doc BY post_id;
data_content_and_doc_tensor = FOREACH data_content_and_doc_tensor GENERATE
  date_gmt AS date_gmt, 
  language AS language, 
  author AS author, 
  url AS url, 
  title AS title, 
  blog AS blog, 
  data_for_feed::post_id as post_id, 
  tags AS tags, 
  blogname AS blogname,
  content AS content, 
  categories AS categories,
  user_item_cf AS user_item_cf,
  (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf;

data_content_and_doc_tensor_feed = FOREACH data_content_and_doc_tensor GENERATE VespaPutOperationDoc(*);

-- use cf latent factor
data_user = LOAD 'blog-recommendation/user_item_cf/user_features' USING JsonLoader('user_id:chararray, user_item_cf:[double]');
data_user = FOREACH data_user GENERATE  
  user_id AS user_id,
  user_item_cf AS user_item_cf;
  

-- Articles already liked
data_likes = FOREACH data GENERATE post_id, FLATTEN(likes) AS (dt, uid); 

post_liked_per_user = GROUP data_likes BY uid;
post_liked_per_user = FOREACH post_liked_per_user GENERATE group AS user_id, data_likes.post_id AS has_read_items;

-- Join user data
data_user = JOIN post_liked_per_user BY user_id FULL, data_user BY user_id;

data_user = FOREACH data_user GENERATE (post_liked_per_user::user_id IS NOT NULL ? post_liked_per_user::user_id : data_user::user_id) AS user_id,
                                       user_item_cf AS user_item_cf,
                                       (user_item_cf IS NOT NULL ? 1 : 0) AS has_user_item_cf,
                                       has_read_items AS has_read_items;
data_user = FILTER data_user BY user_id IS NOT NULL;

data_user_for_feed = FOREACH data_user GENERATE VespaPutOperationUser(*);

joint_content_tensors = UNION data_content_and_doc_tensor_feed, data_user_for_feed;

-- STORE data_for_feed into 'vespa_put_operation';

-- Store into Vespa
STORE joint_content_tensors INTO '$ENDPOINT' USING VespaStorage();