blob: 1d50f2909a2694258cb1b30aaa26086be9670286 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.pig;
import com.fasterxml.jackson.databind.JsonNode;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaHttpClient;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaQuerySchema;
import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.*;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import java.io.IOException;
import java.util.*;
/**
* A Pig UDF to run a query against a Vespa cluster and return the
* results.
*
* @author lesters
*/
public class VespaQuery extends EvalFunc<DataBag> {
private final String PROPERTY_QUERY_TEMPLATE = "query";
private final String PROPERTY_QUERY_SCHEMA = "schema";
private final String PROPERTY_ROOT_NODE = "rootnode";
private final VespaConfiguration configuration;
private final Properties properties;
private final String queryTemplate;
private final String querySchema;
private final String queryRootNode;
private VespaHttpClient httpClient;
public VespaQuery() {
this(new String[0]);
}
public VespaQuery(String... params) {
configuration = VespaConfiguration.get(UDFContext.getUDFContext().getJobConf(), null);
properties = VespaConfiguration.loadProperties(params);
queryTemplate = properties.getProperty(PROPERTY_QUERY_TEMPLATE);
if (queryTemplate == null || queryTemplate.isEmpty()) {
throw new IllegalArgumentException("Query template cannot be empty");
}
querySchema = properties.getProperty(PROPERTY_QUERY_SCHEMA, "rank:int,id:chararray");
queryRootNode = properties.getProperty(PROPERTY_ROOT_NODE, "root/children");
}
@Override
public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}
JsonNode jsonResult = queryVespa(input);
if (jsonResult == null) {
return null;
}
return createPigRepresentation(jsonResult);
}
@Override
public Schema outputSchema(Schema input) {
return VespaQuerySchema.getPigSchema(querySchema);
}
private JsonNode queryVespa(Tuple input) throws IOException {
String url = createVespaQueryUrl(input);
if (url == null) {
return null;
}
String result = executeVespaQuery(url);
return parseVespaResultJson(result);
}
private String createVespaQueryUrl(Tuple input) throws IOException {
return TupleTools.toString(getInputSchema(), input, queryTemplate);
}
private String executeVespaQuery(String url) throws IOException {
if (httpClient == null) {
httpClient = new VespaHttpClient(configuration);
}
return httpClient.get(url);
}
private JsonNode parseVespaResultJson(String result) throws IOException {
return httpClient == null ? null : httpClient.parseResultJson(result, queryRootNode);
}
private DataBag createPigRepresentation(JsonNode hits) {
DataBag bag = new SortedDataBag(null);
VespaQuerySchema querySchema = new VespaQuerySchema(this.querySchema);
for (int rank = 0; rank < hits.size(); ++rank) {
JsonNode hit = hits.get(rank);
Tuple tuple = querySchema.buildTuple(rank, hit);
bag.add(tuple);
}
return bag;
}
}
|