diff options
-rw-r--r-- | vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java | 187 |
1 files changed, 114 insertions, 73 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index 9427ae962d1..b7c58fe968c 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import org.apache.commons.lang.exception.ExceptionUtils; +//import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.EvalFunc; import org.apache.pig.PigWarning; import org.apache.pig.data.DataBag; @@ -14,8 +14,10 @@ import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.tools.pigstats.PigStatusReporter; import org.joda.time.DateTime; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigDecimal; @@ -74,18 +76,29 @@ public class VespaDocumentOperation extends EvalFunc<String> { private static final String ADD_BAG_AS_MAP_FIELDS = "add-bag-as-map-fields"; private static final String EXCLUDE_FIELDS = "exclude-fields"; private static final String TESTSET_CONDITION = "condition"; - - private static final String PARTIAL_UPDATE_ASSIGN = "assign"; private static final String PARTIAL_UPDATE_ADD = "add"; private static final String PARTIAL_UPDATE_REMOVE = "remove"; + private static Map<String, String> operationMap; + static { + operationMap = new HashMap<>(); + operationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE); + operationMap.put(REMOVE_BAG_AS_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); + operationMap.put(ADD_TENSOR_FIELDS, PARTIAL_UPDATE_ADD); + } private final String template; private final Operation operation; private final Properties properties; + private PigStatusReporter statusReporter; public VespaDocumentOperation(String... params) { + statusReporter = PigStatusReporter.getInstance(); + if(statusReporter != null){ + statusReporter.incrCounter("Vespa Document Operation Counters","Document operation ok",0); + statusReporter.incrCounter("Vespa Document Operation Counters","Document operation failed",0); + } properties = VespaConfiguration.loadProperties(params); template = properties.getProperty(PROPERTY_ID_TEMPLATE); operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); @@ -94,13 +107,22 @@ public class VespaDocumentOperation extends EvalFunc<String> { @Override public String exec(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { + if(statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } return null; } if (template == null || template.length() == 0) { + if(statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1); return null; } if (operation == null) { + if(statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); + } warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1); return null; } @@ -115,7 +137,6 @@ public class VespaDocumentOperation extends EvalFunc<String> { Schema inputSchema = getInputSchema(); Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple); String docId = TupleTools.toString(fields, template); - System.out.println(docId); // create json json = create(operation, docId, fields, properties, inputSchema); if (json == null || json.length() == 0) { @@ -125,15 +146,27 @@ public class VespaDocumentOperation extends EvalFunc<String> { } catch (Exception e) { + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters","Document operation failed",1); + } + e.printStackTrace(); + Schema inputSchema = getInputSchema(); + Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple); + String docId = TupleTools.toString(fields, template); + System.out.println(docId); + /* StringBuilder sb = new StringBuilder(); sb.append("Caught exception processing input row: \n"); sb.append(tuple.toString()); sb.append("\nException: "); sb.append(ExceptionUtils.getStackTrace(e)); warn(sb.toString(), PigWarning.UDF_WARNING_1); + */ return null; } - + if (statusReporter != null) { + statusReporter.incrCounter("Vespa Document Operation Counters","Document operation ok",1); + } return json; } @@ -190,8 +223,8 @@ public class VespaDocumentOperation extends EvalFunc<String> { @SuppressWarnings("unchecked") private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException { if (shouldWriteField(name, properties, depth)) { - if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, false) || - isPartialOperation(ADD_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_ASSIGN, false)){ + if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties) || + isPartialOperation(ADD_BAG_AS_MAP_FIELDS, name, properties)){ schema = (schema != null) ? schema.getField(0).schema : null; // extract the key of map and keys in map for writing json when partial updating maps Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; @@ -208,16 +241,16 @@ public class VespaDocumentOperation extends EvalFunc<String> { Byte t = DataType.findType(v); if (t == DataType.TUPLE) { g.writeFieldName(name + "{" + k + "}"); - if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, false)) { + if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties)) { g.writeStartObject(); g.writeFieldName(PARTIAL_UPDATE_REMOVE); g.writeNumber(0); g.writeEndObject(); }else{ if (shouldWritePartialUpdate(op, depth)) { - writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth+1); + writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth); } else { - writeValue(v, t, g, name, properties, valueSchema, op, depth+1); + writeValue(v, t, g, name, properties, valueSchema, op, depth); } } } @@ -278,7 +311,11 @@ public class VespaDocumentOperation extends EvalFunc<String> { g.writeStartObject(); Map<Object, Object> map = (Map<Object, Object>) value; if (shouldCreateTensor(map, name, properties)) { - writeTensor(map, g, isRemoveTensor(name, properties)); + if(isRemoveTensor(name,properties)){ + writeRemoveTensor(map,g); + }else{ + writeTensor(map, g); + } } else { for (Map.Entry<Object, Object> entry : map.entrySet()) { String k = entry.getKey().toString(); @@ -350,29 +387,32 @@ public class VespaDocumentOperation extends EvalFunc<String> { } private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { + boolean isAssign = true; + g.writeStartObject(); - // look up which operation to do by checking names and their respected properties - if (!isPartialOperation(REMOVE_TENSOR_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, true) - && !isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, true) - && !isPartialOperation(ADD_TENSOR_FIELDS, name, properties, g, PARTIAL_UPDATE_ADD, true)) { + for (String label: operationMap.keySet()) { + if (properties.getProperty(label) != null) { + String[] p = properties.getProperty(label).split(","); + if (Arrays.asList(p).contains(name)) { + g.writeFieldName(operationMap.get(label)); + isAssign = false; + } + } + } + if (isAssign) { g.writeFieldName(PARTIAL_UPDATE_ASSIGN); } writeValue(value, type, g, name, properties, schema, op, depth); g.writeEndObject(); - - } - private static boolean isPartialOperation(String label, String name, Properties properties, JsonGenerator g, String targetOperation, boolean writeFieldName) throws IOException{ + private static boolean isPartialOperation(String label, String name, Properties properties) { // when dealing with partial update operations, write the desired operation // writeFieldName decides if a field name should be written when checking boolean isPartialOperation = false; if (properties.getProperty(label) != null) { String[] p = properties.getProperty(label).split(","); if (Arrays.asList(p).contains(name)) { - if (writeFieldName) { - g.writeFieldName(targetOperation); - } isPartialOperation = true; } } @@ -529,70 +569,71 @@ public class VespaDocumentOperation extends EvalFunc<String> { return true; } - private static void writeTensor(Map<Object, Object> map, JsonGenerator g, Boolean isRemoveTensor) throws IOException { - if (!isRemoveTensor){ - g.writeFieldName("cells"); - }else{ - g.writeFieldName("address"); - } + private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException { + g.writeFieldName("cells"); g.writeStartArray(); for (Map.Entry<Object, Object> entry : map.entrySet()) { String k = entry.getKey().toString(); Double v = Double.parseDouble(entry.getValue().toString()); - // Write address - if (!isRemoveTensor){ - - g.writeStartObject(); + g.writeStartObject(); - g.writeFieldName("address"); - g.writeStartObject(); + // Write address + g.writeFieldName("address"); + g.writeStartObject(); - String[] dimensions = k.split(","); - for (String dimension : dimensions) { - if (dimension == null || dimension.isEmpty()) { - continue; - } - String[] address = dimension.split(":"); - if (address.length != 2) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - String dim = address[0]; - String label = address[1]; - if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - g.writeFieldName(dim.trim()); - g.writeString(label.trim()); + String[] dimensions = k.split(","); + for (String dimension : dimensions) { + if (dimension == null || dimension.isEmpty()) { + continue; } - g.writeEndObject(); + String[] address = dimension.split(":"); + if (address.length != 2) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + String dim = address[0]; + String label = address[1]; + if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + g.writeFieldName(dim.trim()); + g.writeString(label.trim()); + } + g.writeEndObject(); - // Write value - g.writeFieldName("value"); - g.writeNumber(v); + // Write value + g.writeFieldName("value"); + g.writeNumber(v); - g.writeEndObject(); + g.writeEndObject(); + } + g.writeEndArray(); + } - }else{ - String[] dimensions = k.split(","); - for (String dimension : dimensions) { - g.writeStartObject(); - if (dimension == null || dimension.isEmpty()) { - continue; - } - String[] address = dimension.split(":"); - if (address.length != 2) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - String dim = address[0]; - String label = address[1]; - if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - g.writeFieldName(dim.trim()); - g.writeString(label.trim()); - g.writeEndObject(); + private static void writeRemoveTensor(Map<Object, Object> map, JsonGenerator g) throws IOException { + g.writeFieldName("addresses"); + g.writeStartArray(); + for (Map.Entry<Object, Object> entry : map.entrySet()) { + String k = entry.getKey().toString(); + String[] dimensions = k.split(","); + for (String dimension : dimensions) { + g.writeStartObject(); + if (dimension == null || dimension.isEmpty()) { + continue; } + String[] address = dimension.split(":"); + if (address.length != 2) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + String dim = address[0]; + String label = address[1]; + if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + g.writeFieldName(dim.trim()); + g.writeString(label.trim()); + g.writeEndObject(); + // Write address } } g.writeEndArray(); |