aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjingtinghuang <r06944049@ntu.edu.tw>2020-03-31 14:53:18 +0800
committerjingtinghuang <r06944049@ntu.edu.tw>2020-03-31 14:53:18 +0800
commit4788d1fdaae9a49f4cde171066f589cc0d5e3f4e (patch)
treed3076fc6c483f9d0ac26ee08c3e633498cb8b962
parenta3756480c7886ad029db6f2fbfaacb088d3ffee1 (diff)
pr review
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java187
1 files changed, 114 insertions, 73 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
index 9427ae962d1..b7c58fe968c 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -6,7 +6,7 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import org.apache.commons.lang.exception.ExceptionUtils;
+//import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigWarning;
import org.apache.pig.data.DataBag;
@@ -14,8 +14,10 @@ import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.joda.time.DateTime;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
@@ -74,18 +76,29 @@ public class VespaDocumentOperation extends EvalFunc<String> {
private static final String ADD_BAG_AS_MAP_FIELDS = "add-bag-as-map-fields";
private static final String EXCLUDE_FIELDS = "exclude-fields";
private static final String TESTSET_CONDITION = "condition";
-
-
private static final String PARTIAL_UPDATE_ASSIGN = "assign";
private static final String PARTIAL_UPDATE_ADD = "add";
private static final String PARTIAL_UPDATE_REMOVE = "remove";
+ private static Map<String, String> operationMap;
+ static {
+ operationMap = new HashMap<>();
+ operationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE);
+ operationMap.put(REMOVE_BAG_AS_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
+ operationMap.put(ADD_TENSOR_FIELDS, PARTIAL_UPDATE_ADD);
+ }
private final String template;
private final Operation operation;
private final Properties properties;
+ private PigStatusReporter statusReporter;
public VespaDocumentOperation(String... params) {
+ statusReporter = PigStatusReporter.getInstance();
+ if(statusReporter != null){
+ statusReporter.incrCounter("Vespa Document Operation Counters","Document operation ok",0);
+ statusReporter.incrCounter("Vespa Document Operation Counters","Document operation failed",0);
+ }
properties = VespaConfiguration.loadProperties(params);
template = properties.getProperty(PROPERTY_ID_TEMPLATE);
operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
@@ -94,13 +107,22 @@ public class VespaDocumentOperation extends EvalFunc<String> {
@Override
public String exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
+ if(statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
+ }
return null;
}
if (template == null || template.length() == 0) {
+ if(statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
+ }
warn("No valid document id template found. Skipping.", PigWarning.UDF_WARNING_1);
return null;
}
if (operation == null) {
+ if(statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1);
+ }
warn("No valid operation found. Skipping.", PigWarning.UDF_WARNING_1);
return null;
}
@@ -115,7 +137,6 @@ public class VespaDocumentOperation extends EvalFunc<String> {
Schema inputSchema = getInputSchema();
Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple);
String docId = TupleTools.toString(fields, template);
- System.out.println(docId);
// create json
json = create(operation, docId, fields, properties, inputSchema);
if (json == null || json.length() == 0) {
@@ -125,15 +146,27 @@ public class VespaDocumentOperation extends EvalFunc<String> {
} catch (Exception e) {
+ if (statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters","Document operation failed",1);
+ }
+ e.printStackTrace();
+ Schema inputSchema = getInputSchema();
+ Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple);
+ String docId = TupleTools.toString(fields, template);
+ System.out.println(docId);
+ /*
StringBuilder sb = new StringBuilder();
sb.append("Caught exception processing input row: \n");
sb.append(tuple.toString());
sb.append("\nException: ");
sb.append(ExceptionUtils.getStackTrace(e));
warn(sb.toString(), PigWarning.UDF_WARNING_1);
+ */
return null;
}
-
+ if (statusReporter != null) {
+ statusReporter.incrCounter("Vespa Document Operation Counters","Document operation ok",1);
+ }
return json;
}
@@ -190,8 +223,8 @@ public class VespaDocumentOperation extends EvalFunc<String> {
@SuppressWarnings("unchecked")
private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException {
if (shouldWriteField(name, properties, depth)) {
- if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, false) ||
- isPartialOperation(ADD_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_ASSIGN, false)){
+ if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties) ||
+ isPartialOperation(ADD_BAG_AS_MAP_FIELDS, name, properties)){
schema = (schema != null) ? schema.getField(0).schema : null;
// extract the key of map and keys in map for writing json when partial updating maps
Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
@@ -208,16 +241,16 @@ public class VespaDocumentOperation extends EvalFunc<String> {
Byte t = DataType.findType(v);
if (t == DataType.TUPLE) {
g.writeFieldName(name + "{" + k + "}");
- if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, false)) {
+ if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties)) {
g.writeStartObject();
g.writeFieldName(PARTIAL_UPDATE_REMOVE);
g.writeNumber(0);
g.writeEndObject();
}else{
if (shouldWritePartialUpdate(op, depth)) {
- writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth+1);
+ writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth);
} else {
- writeValue(v, t, g, name, properties, valueSchema, op, depth+1);
+ writeValue(v, t, g, name, properties, valueSchema, op, depth);
}
}
}
@@ -278,7 +311,11 @@ public class VespaDocumentOperation extends EvalFunc<String> {
g.writeStartObject();
Map<Object, Object> map = (Map<Object, Object>) value;
if (shouldCreateTensor(map, name, properties)) {
- writeTensor(map, g, isRemoveTensor(name, properties));
+ if(isRemoveTensor(name,properties)){
+ writeRemoveTensor(map,g);
+ }else{
+ writeTensor(map, g);
+ }
} else {
for (Map.Entry<Object, Object> entry : map.entrySet()) {
String k = entry.getKey().toString();
@@ -350,29 +387,32 @@ public class VespaDocumentOperation extends EvalFunc<String> {
}
private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
+ boolean isAssign = true;
+
g.writeStartObject();
- // look up which operation to do by checking names and their respected properties
- if (!isPartialOperation(REMOVE_TENSOR_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, true)
- && !isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, true)
- && !isPartialOperation(ADD_TENSOR_FIELDS, name, properties, g, PARTIAL_UPDATE_ADD, true)) {
+ for (String label: operationMap.keySet()) {
+ if (properties.getProperty(label) != null) {
+ String[] p = properties.getProperty(label).split(",");
+ if (Arrays.asList(p).contains(name)) {
+ g.writeFieldName(operationMap.get(label));
+ isAssign = false;
+ }
+ }
+ }
+ if (isAssign) {
g.writeFieldName(PARTIAL_UPDATE_ASSIGN);
}
writeValue(value, type, g, name, properties, schema, op, depth);
g.writeEndObject();
-
-
}
- private static boolean isPartialOperation(String label, String name, Properties properties, JsonGenerator g, String targetOperation, boolean writeFieldName) throws IOException{
+ private static boolean isPartialOperation(String label, String name, Properties properties) {
// when dealing with partial update operations, write the desired operation
// writeFieldName decides if a field name should be written when checking
boolean isPartialOperation = false;
if (properties.getProperty(label) != null) {
String[] p = properties.getProperty(label).split(",");
if (Arrays.asList(p).contains(name)) {
- if (writeFieldName) {
- g.writeFieldName(targetOperation);
- }
isPartialOperation = true;
}
}
@@ -529,70 +569,71 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return true;
}
- private static void writeTensor(Map<Object, Object> map, JsonGenerator g, Boolean isRemoveTensor) throws IOException {
- if (!isRemoveTensor){
- g.writeFieldName("cells");
- }else{
- g.writeFieldName("address");
- }
+ private static void writeTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
+ g.writeFieldName("cells");
g.writeStartArray();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
String k = entry.getKey().toString();
Double v = Double.parseDouble(entry.getValue().toString());
- // Write address
- if (!isRemoveTensor){
-
- g.writeStartObject();
+ g.writeStartObject();
- g.writeFieldName("address");
- g.writeStartObject();
+ // Write address
+ g.writeFieldName("address");
+ g.writeStartObject();
- String[] dimensions = k.split(",");
- for (String dimension : dimensions) {
- if (dimension == null || dimension.isEmpty()) {
- continue;
- }
- String[] address = dimension.split(":");
- if (address.length != 2) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- String dim = address[0];
- String label = address[1];
- if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- g.writeFieldName(dim.trim());
- g.writeString(label.trim());
+ String[] dimensions = k.split(",");
+ for (String dimension : dimensions) {
+ if (dimension == null || dimension.isEmpty()) {
+ continue;
}
- g.writeEndObject();
+ String[] address = dimension.split(":");
+ if (address.length != 2) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ String dim = address[0];
+ String label = address[1];
+ if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ g.writeFieldName(dim.trim());
+ g.writeString(label.trim());
+ }
+ g.writeEndObject();
- // Write value
- g.writeFieldName("value");
- g.writeNumber(v);
+ // Write value
+ g.writeFieldName("value");
+ g.writeNumber(v);
- g.writeEndObject();
+ g.writeEndObject();
+ }
+ g.writeEndArray();
+ }
- }else{
- String[] dimensions = k.split(",");
- for (String dimension : dimensions) {
- g.writeStartObject();
- if (dimension == null || dimension.isEmpty()) {
- continue;
- }
- String[] address = dimension.split(":");
- if (address.length != 2) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- String dim = address[0];
- String label = address[1];
- if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
- throw new IllegalArgumentException("Malformed cell address: " + dimension);
- }
- g.writeFieldName(dim.trim());
- g.writeString(label.trim());
- g.writeEndObject();
+ private static void writeRemoveTensor(Map<Object, Object> map, JsonGenerator g) throws IOException {
+ g.writeFieldName("addresses");
+ g.writeStartArray();
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ String k = entry.getKey().toString();
+ String[] dimensions = k.split(",");
+ for (String dimension : dimensions) {
+ g.writeStartObject();
+ if (dimension == null || dimension.isEmpty()) {
+ continue;
}
+ String[] address = dimension.split(":");
+ if (address.length != 2) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ String dim = address[0];
+ String label = address[1];
+ if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) {
+ throw new IllegalArgumentException("Malformed cell address: " + dimension);
+ }
+ g.writeFieldName(dim.trim());
+ g.writeString(label.trim());
+ g.writeEndObject();
+ // Write address
}
}
g.writeEndArray();