summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop
diff options
context:
space:
mode:
authorChih-Hsien Cheng <chih-hsien.cheng@verizonmedia.com>2019-12-26 15:52:04 +0800
committerChih-Hsien Cheng <chih-hsien.cheng@verizonmedia.com>2019-12-26 15:52:04 +0800
commitb875133e430b5ebf472c70b173b58ab0e842a499 (patch)
treec992cf525c51807426375acd67be384ba6cf777f /vespa-hadoop
parent28a35b8d53cbd6dda54eb141e29b12584b297410 (diff)
add several features to VespaDocumentOperation:
1. create-if-non-existent 2. treat bag as map // bag-as-map-fields 3. treat tuple as map // simple-object-fields
Diffstat (limited to 'vespa-hadoop')
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java131
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java4
2 files changed, 110 insertions, 25 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
index 7e16979056c..fa463a77923 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -13,6 +13,7 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.joda.time.DateTime;
import java.io.ByteArrayOutputStream;
@@ -60,9 +61,12 @@ public class VespaDocumentOperation extends EvalFunc<String> {
}
+ private static final String PROPERTY_CREATE_IF_NON_EXISTENT = "create-if-non-existent";
private static final String PROPERTY_ID_TEMPLATE = "docid";
private static final String PROPERTY_OPERATION = "operation";
+ private static final String BAG_AS_MAP_FIELDS = "bag-as-map-fields";
private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields";
+ private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields";
private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
private static final String EXCLUDE_FIELDS = "exclude-fields";
@@ -99,11 +103,12 @@ public class VespaDocumentOperation extends EvalFunc<String> {
reporter.progress();
}
- Map<String, Object> fields = TupleTools.tupleMap(getInputSchema(), tuple);
+ Schema inputSchema = getInputSchema();
+ Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple);
String docId = TupleTools.toString(fields, template);
// create json
- json = create(operation, docId, fields, properties);
+ json = create(operation, docId, fields, properties, inputSchema);
if (json == null || json.length() == 0) {
warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1);
return null;
@@ -134,7 +139,8 @@ public class VespaDocumentOperation extends EvalFunc<String> {
* @return A valid JSON Vespa document operation
* @throws IOException ...
*/
- public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException {
+ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties,
+ Schema schema) throws IOException {
if (op == null) {
return null;
}
@@ -152,8 +158,13 @@ public class VespaDocumentOperation extends EvalFunc<String> {
g.writeStringField(op.toString(), docId);
+ boolean createIfNonExistent = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_IF_NON_EXISTENT, "false"));
+ if (op == Operation.UPDATE && createIfNonExistent) {
+ writeField("create", true, DataType.BOOLEAN, g, properties, schema, op, 0);
+ }
+
if (op != Operation.REMOVE) {
- writeField("fields", fields, DataType.MAP, g, properties, op, 0);
+ writeField("fields", fields, DataType.MAP, g, properties, schema, op, 0);
}
g.writeEndObject();
@@ -164,19 +175,19 @@ public class VespaDocumentOperation extends EvalFunc<String> {
@SuppressWarnings("unchecked")
- private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Operation op, int depth) throws IOException {
+ private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException {
if (shouldWriteField(name, properties, depth)) {
g.writeFieldName(name);
if (shouldWritePartialUpdate(op, depth)) {
- writePartialUpdate(value, type, g, name, properties, op, depth);
+ writePartialUpdate(value, type, g, name, properties, schema, op, depth);
} else {
- writeValue(value, type, g, name, properties, op, depth);
+ writeValue(value, type, g, name, properties, schema, op, depth);
}
}
}
@SuppressWarnings("unchecked")
- private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
switch (type) {
case DataType.UNKNOWN:
break;
@@ -225,31 +236,63 @@ public class VespaDocumentOperation extends EvalFunc<String> {
String k = entry.getKey().toString();
Object v = entry.getValue();
Byte t = DataType.findType(v);
- writeField(k, v, t, g, properties, op, depth+1);
+ Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null;
+ writeField(k, v, t, g, properties, fieldSchema, op, depth+1);
}
}
g.writeEndObject();
break;
case DataType.TUPLE:
Tuple tuple = (Tuple) value;
- boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
- if (writeStartArray) {
- g.writeStartArray();
- }
- for (Object v : tuple) {
- writeValue(v, DataType.findType(v), g, name, properties, op, depth);
- }
- if (writeStartArray) {
- g.writeEndArray();
+ if (shouldWriteTupleAsMap(name, properties)) {
+ Map<String, Object> fields = TupleTools.tupleMap(schema, tuple);
+ writeValue(fields, DataType.MAP, g, name, properties, schema, op, depth);
+ } else {
+ boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
+ if (writeStartArray) {
+ g.writeStartArray();
+ }
+ for (Object v : tuple) {
+ writeValue(v, DataType.findType(v), g, name, properties, schema, op, depth);
+ }
+ if (writeStartArray) {
+ g.writeEndArray();
+ }
}
break;
case DataType.BAG:
DataBag bag = (DataBag) value;
- g.writeStartArray();
- for (Tuple t : bag) {
- writeValue(t, DataType.TUPLE, g, name, properties, op, depth);
+ // get the schema of the tuple in bag
+ schema = (schema != null) ? schema.getField(0).schema : null;
+
+ if (shouldWriteBagAsMap(name, properties)) {
+ // when treating bag as map, the schema of bag should be {(key, val)....}
+ // the size of tuple in bag should be 2. 1st one is key. 2nd one is val.
+ Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
+
+ g.writeStartObject();
+ for (Tuple element : bag) {
+ if (element.size() != 2) {
+ continue;
+ }
+ String k = (String) element.get(0);
+ Object v = element.get(1);
+ Byte t = DataType.findType(v);
+ if (t == DataType.TUPLE) {
+ Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v);
+ writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth);
+ } else {
+ writeField(k, v, t, g, properties, valueSchema, op, depth);
+ }
+ }
+ g.writeEndObject();
+ } else {
+ g.writeStartArray();
+ for (Tuple t : bag) {
+ writeValue(t, DataType.TUPLE, g, name, properties, schema, op, depth);
+ }
+ g.writeEndArray();
}
- g.writeEndArray();
break;
}
@@ -259,10 +302,10 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return op == Operation.UPDATE && depth == 1;
}
- private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
g.writeStartObject();
g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation
- writeValue(value, type, g, name, properties, op, depth);
+ writeValue(value, type, g, name, properties, schema, op, depth);
g.writeEndObject();
}
@@ -286,6 +329,46 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return true;
}
+ private static boolean shouldWriteTupleAsMap(String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS);
+ if (simpleObjectFields == null) {
+ return false;
+ }
+ if (simpleObjectFields.equals("*")) {
+ return true;
+ }
+ String[] fields = simpleObjectFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean shouldWriteBagAsMap(String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String bagAsMapFields = properties.getProperty(BAG_AS_MAP_FIELDS);
+ if (bagAsMapFields == null) {
+ return false;
+ }
+ if (bagAsMapFields.equals("*")) {
+ return true;
+ }
+ String[] fields = bagAsMapFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) {
if (properties == null) {
return false;
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
index f0283636725..0ef90214f41 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
@@ -11,6 +11,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import java.io.*;
@@ -160,7 +161,8 @@ public class VespaStorage extends StoreFunc {
Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple);
String docId = TupleTools.toString(fields, template);
- return VespaDocumentOperation.create(operation, docId, fields, properties);
+ Schema schema = Schema.getPigSchema(resourceSchema);
+ return VespaDocumentOperation.create(operation, docId, fields, properties, schema);
}