diff options
-rw-r--r-- | vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java | 131 | ||||
-rw-r--r-- | vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java | 4 |
2 files changed, 110 insertions, 25 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index 7e16979056c..fa463a77923 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -13,6 +13,7 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.joda.time.DateTime; import java.io.ByteArrayOutputStream; @@ -60,9 +61,12 @@ public class VespaDocumentOperation extends EvalFunc<String> { } + private static final String PROPERTY_CREATE_IF_NON_EXISTENT = "create-if-non-existent"; private static final String PROPERTY_ID_TEMPLATE = "docid"; private static final String PROPERTY_OPERATION = "operation"; + private static final String BAG_AS_MAP_FIELDS = "bag-as-map-fields"; private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields"; + private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields"; private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields"; private static final String EXCLUDE_FIELDS = "exclude-fields"; @@ -99,11 +103,12 @@ public class VespaDocumentOperation extends EvalFunc<String> { reporter.progress(); } - Map<String, Object> fields = TupleTools.tupleMap(getInputSchema(), tuple); + Schema inputSchema = getInputSchema(); + Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple); String docId = TupleTools.toString(fields, template); // create json - json = create(operation, docId, fields, properties); + json = create(operation, docId, fields, properties, inputSchema); if (json == null || json.length() == 0) { warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1); return null; @@ -134,7 +139,8 @@ public class VespaDocumentOperation extends EvalFunc<String> { * @return A valid JSON Vespa document operation * @throws IOException ... */ - public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException { + public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties, + Schema schema) throws IOException { if (op == null) { return null; } @@ -152,8 +158,13 @@ public class VespaDocumentOperation extends EvalFunc<String> { g.writeStringField(op.toString(), docId); + boolean createIfNonExistent = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_IF_NON_EXISTENT, "false")); + if (op == Operation.UPDATE && createIfNonExistent) { + writeField("create", true, DataType.BOOLEAN, g, properties, schema, op, 0); + } + if (op != Operation.REMOVE) { - writeField("fields", fields, DataType.MAP, g, properties, op, 0); + writeField("fields", fields, DataType.MAP, g, properties, schema, op, 0); } g.writeEndObject(); @@ -164,19 +175,19 @@ public class VespaDocumentOperation extends EvalFunc<String> { @SuppressWarnings("unchecked") - private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Operation op, int depth) throws IOException { + private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException { if (shouldWriteField(name, properties, depth)) { g.writeFieldName(name); if (shouldWritePartialUpdate(op, depth)) { - writePartialUpdate(value, type, g, name, properties, op, depth); + writePartialUpdate(value, type, g, name, properties, schema, op, depth); } else { - writeValue(value, type, g, name, properties, op, depth); + writeValue(value, type, g, name, properties, schema, op, depth); } } } @SuppressWarnings("unchecked") - private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException { + private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { switch (type) { case DataType.UNKNOWN: break; @@ -225,31 +236,63 @@ public class VespaDocumentOperation extends EvalFunc<String> { String k = entry.getKey().toString(); Object v = entry.getValue(); Byte t = DataType.findType(v); - writeField(k, v, t, g, properties, op, depth+1); + Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null; + writeField(k, v, t, g, properties, fieldSchema, op, depth+1); } } g.writeEndObject(); break; case DataType.TUPLE: Tuple tuple = (Tuple) value; - boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties); - if (writeStartArray) { - g.writeStartArray(); - } - for (Object v : tuple) { - writeValue(v, DataType.findType(v), g, name, properties, op, depth); - } - if (writeStartArray) { - g.writeEndArray(); + if (shouldWriteTupleAsMap(name, properties)) { + Map<String, Object> fields = TupleTools.tupleMap(schema, tuple); + writeValue(fields, DataType.MAP, g, name, properties, schema, op, depth); + } else { + boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties); + if (writeStartArray) { + g.writeStartArray(); + } + for (Object v : tuple) { + writeValue(v, DataType.findType(v), g, name, properties, schema, op, depth); + } + if (writeStartArray) { + g.writeEndArray(); + } } break; case DataType.BAG: DataBag bag = (DataBag) value; - g.writeStartArray(); - for (Tuple t : bag) { - writeValue(t, DataType.TUPLE, g, name, properties, op, depth); + // get the schema of the tuple in bag + schema = (schema != null) ? schema.getField(0).schema : null; + + if (shouldWriteBagAsMap(name, properties)) { + // when treating bag as map, the schema of bag should be {(key, val)....} + // the size of tuple in bag should be 2. 1st one is key. 2nd one is val. + Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; + + g.writeStartObject(); + for (Tuple element : bag) { + if (element.size() != 2) { + continue; + } + String k = (String) element.get(0); + Object v = element.get(1); + Byte t = DataType.findType(v); + if (t == DataType.TUPLE) { + Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v); + writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth); + } else { + writeField(k, v, t, g, properties, valueSchema, op, depth); + } + } + g.writeEndObject(); + } else { + g.writeStartArray(); + for (Tuple t : bag) { + writeValue(t, DataType.TUPLE, g, name, properties, schema, op, depth); + } + g.writeEndArray(); } - g.writeEndArray(); break; } @@ -259,10 +302,10 @@ public class VespaDocumentOperation extends EvalFunc<String> { return op == Operation.UPDATE && depth == 1; } - private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException { + private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { g.writeStartObject(); g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation - writeValue(value, type, g, name, properties, op, depth); + writeValue(value, type, g, name, properties, schema, op, depth); g.writeEndObject(); } @@ -286,6 +329,46 @@ public class VespaDocumentOperation extends EvalFunc<String> { return true; } + private static boolean shouldWriteTupleAsMap(String name, Properties properties) { + if (properties == null) { + return false; + } + String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS); + if (simpleObjectFields == null) { + return false; + } + if (simpleObjectFields.equals("*")) { + return true; + } + String[] fields = simpleObjectFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + return false; + } + + private static boolean shouldWriteBagAsMap(String name, Properties properties) { + if (properties == null) { + return false; + } + String bagAsMapFields = properties.getProperty(BAG_AS_MAP_FIELDS); + if (bagAsMapFields == null) { + return false; + } + if (bagAsMapFields.equals("*")) { + return true; + } + String[] fields = bagAsMapFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + return false; + } + private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) { if (properties == null) { return false; diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java index f0283636725..0ef90214f41 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java @@ -11,6 +11,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.UDFContext; import java.io.*; @@ -160,7 +161,8 @@ public class VespaStorage extends StoreFunc { Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple); String docId = TupleTools.toString(fields, template); - return VespaDocumentOperation.create(operation, docId, fields, properties); + Schema schema = Schema.getPigSchema(resourceSchema); + return VespaDocumentOperation.create(operation, docId, fields, properties, schema); } |