diff options
Diffstat (limited to 'vespa-hadoop/src/main')
-rw-r--r-- | vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java | 249 |
1 files changed, 221 insertions, 28 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index 57b01085460..94176bbb658 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -12,6 +12,7 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.tools.pigstats.PigStatusReporter; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.joda.time.DateTime; @@ -70,16 +71,45 @@ public class VespaDocumentOperation extends EvalFunc<String> { private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields"; private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields"; private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields"; + private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields"; + private static final String UPDATE_TENSOR_FIELDS = "update-tensor-fields"; + private static final String REMOVE_MAP_FIELDS = "remove-map-fields"; + private static final String UPDATE_MAP_FIELDS = "update-map-fields"; private static final String EXCLUDE_FIELDS = "exclude-fields"; private static final String TESTSET_CONDITION = "condition"; - private static final String PARTIAL_UPDATE_ASSIGN = "assign"; + private static final String PARTIAL_UPDATE_ADD = "add"; + private static final String PARTIAL_UPDATE_REMOVE = "remove"; + + private static Map<String, String> mapPartialOperationMap; + + static { + mapPartialOperationMap = new HashMap<>(); + mapPartialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); + mapPartialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); + } + + private static Map<String, String> partialOperationMap; + + static { + partialOperationMap = new HashMap<>(); + partialOperationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE); + partialOperationMap.put(UPDATE_TENSOR_FIELDS, PARTIAL_UPDATE_ADD); + partialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); + partialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); + } private final String template; private final Operation operation; private final Properties properties; + private PigStatusReporter statusReporter; public VespaDocumentOperation(String... params) { + statusReporter = PigStatusReporter.getInstance(); + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 0); + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 0); + } properties = VespaConfiguration.loadProperties(params); template = properties.getProperty(PROPERTY_ID_TEMPLATE); operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); @@ -88,9 +118,15 @@ public class VespaDocumentOperation extends EvalFunc<String> { @Override public String exec(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } return null; } if (template == null || template.length() == 0) { + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } warnLog("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1); return null; } @@ -109,7 +145,6 @@ public class VespaDocumentOperation extends EvalFunc<String> { Schema inputSchema = getInputSchema(); Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple); String docId = TupleTools.toString(fields, template); - // create json json = create(operation, docId, fields, properties, inputSchema); if (json == null || json.length() == 0) { @@ -119,6 +154,9 @@ public class VespaDocumentOperation extends EvalFunc<String> { } catch (Exception e) { + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } StringBuilder sb = new StringBuilder(); sb.append("Caught exception processing input row: \n"); sb.append(tuple.toString()); @@ -127,7 +165,9 @@ public class VespaDocumentOperation extends EvalFunc<String> { warnLog(sb.toString(), PigWarning.UDF_WARNING_4); return null; } - + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation ok", 1); + } return json; } @@ -136,14 +176,14 @@ public class VespaDocumentOperation extends EvalFunc<String> { * Create a JSON Vespa document operation given the supplied fields, * operation and document id template. * - * @param op Operation (put, remove, update) - * @param docId Document id - * @param fields Fields to put in document operation - * @return A valid JSON Vespa document operation + * @param op Operation (put, remove, update) + * @param docId Document id + * @param fields Fields to put in document operation + * @return A valid JSON Vespa document operation * @throws IOException ... */ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties, - Schema schema) throws IOException { + Schema schema) throws IOException { if (op == null) { return null; } @@ -180,15 +220,72 @@ public class VespaDocumentOperation extends EvalFunc<String> { return out.toString(); } + private static String getPartialOperation(Map<String, String> operationMap, String name, Properties properties) { + // This function checks if the property of the name falls into the map provided + // if yes, return the desired operation. if no, return null + // for example, input: + // operationMap map{"update-map-fields":"assign","remove-map-fields":"remove"} + // name date + // properties "update-map-fields":"date,month" + // output: assign + for (String label : operationMap.keySet()) { + if (properties.getProperty(label) != null) { + String[] p = properties.getProperty(label).split(","); + if (Arrays.asList(p).contains(name)) { + return operationMap.get(label); + } + } + } + return null; + } @SuppressWarnings("unchecked") private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException { if (shouldWriteField(name, properties, depth)) { - g.writeFieldName(name); - if (shouldWritePartialUpdate(op, depth)) { - writePartialUpdate(value, type, g, name, properties, schema, op, depth); + String operation = getPartialOperation(mapPartialOperationMap, name, properties); + // check if the name has the property update-map-fields/remove-map-fields + // if yes, we need special treatments here as we need to loop through the tuple + // be aware the the operation here is not vespa operation such as "put" and "update" + // operation here are the field name we wish use to such as "assign" and "remove" + if (operation != null) { + writePartialUpdateAndRemoveMap(name, value, g, properties, schema, op, depth, operation); } else { - writeValue(value, type, g, name, properties, schema, op, depth); + g.writeFieldName(name); + if (shouldWritePartialUpdate(op, depth)) { + writePartialUpdate(value, type, g, name, properties, schema, op, depth); + } else { + writeValue(value, type, g, name, properties, schema, op, depth); + } + } + + } + } + + private static void writePartialUpdateAndRemoveMap(String name, Object value, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth, String operation) throws IOException { + schema = (schema != null) ? schema.getField(0).schema : null; + // extract the key of map and keys in map for writing json when partial updating maps + Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; + // data format { ( key; id, value: (abc,123,(123234,bbaa))) } + // the first element of each tuple in the bag will be the map to update + // the second element of each tuple in the bag will be the new value of the map + DataBag bag = (DataBag) value; + for (Tuple element : bag) { + if (element.size() != 2) { + continue; + } + String k = (String) element.get(0); + Object v = element.get(1); + Byte t = DataType.findType(v); + if (t == DataType.TUPLE) { + g.writeFieldName(name + "{" + k + "}"); + if (operation.equals(PARTIAL_UPDATE_REMOVE)) { + g.writeStartObject(); + g.writeFieldName(PARTIAL_UPDATE_REMOVE); + g.writeNumber(0); + g.writeEndObject(); + } else { + writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth); + } } } } @@ -237,14 +334,18 @@ public class VespaDocumentOperation extends EvalFunc<String> { g.writeStartObject(); Map<Object, Object> map = (Map<Object, Object>) value; if (shouldCreateTensor(map, name, properties)) { - writeTensor(map, g); + if (isRemoveTensor(name, properties)) { + writeRemoveTensor(map, g); + } else { + writeTensor(map, g); + } } else { for (Map.Entry<Object, Object> entry : map.entrySet()) { String k = entry.getKey().toString(); Object v = entry.getValue(); - Byte t = DataType.findType(v); + Byte t = DataType.findType(v); Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null; - writeField(k, v, t, g, properties, fieldSchema, op, depth+1); + writeField(k, v, t, g, properties, fieldSchema, op, depth + 1); } } g.writeEndObject(); @@ -271,7 +372,6 @@ public class VespaDocumentOperation extends EvalFunc<String> { DataBag bag = (DataBag) value; // get the schema of the tuple in bag schema = (schema != null) ? schema.getField(0).schema : null; - if (shouldWriteBagAsMap(name, properties)) { // when treating bag as map, the schema of bag should be {(key, val)....} // the size of tuple in bag should be 2. 1st one is key. 2nd one is val. @@ -287,9 +387,9 @@ public class VespaDocumentOperation extends EvalFunc<String> { Byte t = DataType.findType(v); if (t == DataType.TUPLE) { Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v); - writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth); + writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth + 1); } else { - writeField(k, v, t, g, properties, valueSchema, op, depth); + writeField(k, v, t, g, properties, valueSchema, op, depth + 1); } } g.writeEndObject(); @@ -311,7 +411,15 @@ public class VespaDocumentOperation extends EvalFunc<String> { private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { g.writeStartObject(); - g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation + // here we check if the operation falls into the four partial operations we do on map/tensor structure + // if no, we assume it's a update on the whole document and we write assign here + // if yes, we write the desired operation here + String operation = getPartialOperation(partialOperationMap, name, properties); + if (operation != null) { + g.writeFieldName(operation); + } else { + g.writeFieldName(PARTIAL_UPDATE_ASSIGN); + } writeValue(value, type, g, name, properties, schema, op, depth); g.writeEndObject(); } @@ -337,21 +445,38 @@ public class VespaDocumentOperation extends EvalFunc<String> { } private static boolean shouldWriteTupleAsMap(String name, Properties properties) { + // include UPDATE_MAP_FIELDS here because when updating the map + // the second element in each tuple should be written as a map if (properties == null) { return false; } + String addBagAsMapFields = properties.getProperty(UPDATE_MAP_FIELDS); String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS); - if (simpleObjectFields == null) { + if (simpleObjectFields == null && addBagAsMapFields == null) { return false; } - if (simpleObjectFields.equals("*")) { - return true; + if (addBagAsMapFields != null) { + if (addBagAsMapFields.equals("*")) { + return true; + } + String[] fields = addBagAsMapFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } - String[] fields = simpleObjectFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { + if (simpleObjectFields != null) { + if (simpleObjectFields.equals("*")) { return true; } + String[] fields = simpleObjectFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } } return false; } @@ -380,11 +505,50 @@ public class VespaDocumentOperation extends EvalFunc<String> { if (properties == null) { return false; } - String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); - if (tensorFields == null) { + String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); + String addTensorFields = properties.getProperty(UPDATE_TENSOR_FIELDS); + String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); + + if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) { return false; } - String[] fields = tensorFields.split(","); + String[] fields; + if (createTensorFields != null) { + fields = createTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + if (addTensorFields != null) { + fields = addTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + if (removeTensorFields != null) { + fields = removeTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + return false; + } + + private static boolean isRemoveTensor(String name, Properties properties) { + if (properties == null) { + return false; + } + String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); + if (removeTensorFields == null) { + return false; + } + String[] fields = removeTensorFields.split(","); for (String field : fields) { if (field.trim().equalsIgnoreCase(name)) { return true; @@ -451,6 +615,35 @@ public class VespaDocumentOperation extends EvalFunc<String> { g.writeEndArray(); } + private static void writeRemoveTensor(Map<Object, Object> map, JsonGenerator g) throws IOException { + g.writeFieldName("addresses"); + g.writeStartArray(); + for (Map.Entry<Object, Object> entry : map.entrySet()) { + String k = entry.getKey().toString(); + String[] dimensions = k.split(","); + for (String dimension : dimensions) { + g.writeStartObject(); + if (dimension == null || dimension.isEmpty()) { + continue; + } + String[] address = dimension.split(":"); + if (address.length != 2) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + String dim = address[0]; + String label = address[1]; + if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + g.writeFieldName(dim.trim()); + g.writeString(label.trim()); + g.writeEndObject(); + // Write address + } + } + g.writeEndArray(); + } + // copied from vespajlib for reducing dependency and building with JDK 8 private static String getStackTraceAsString(Throwable throwable) { try (StringWriter stringWriter = new StringWriter(); |