summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop
diff options
context:
space:
mode:
authorjingtinghuang <r06944049@ntu.edu.tw>2020-03-31 17:39:04 +0800
committerjingtinghuang <r06944049@ntu.edu.tw>2020-03-31 17:39:04 +0800
commitd254707e6a9f5555321c108996cfd7b61c55f23d (patch)
tree59651ebf98a93219b35e2aed9d4717a655cb356e /vespa-hadoop
parent4788d1fdaae9a49f4cde171066f589cc0d5e3f4e (diff)
pr fix and add counter
Diffstat (limited to 'vespa-hadoop')
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java149
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java9
2 files changed, 87 insertions, 71 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
index b7c58fe968c..b81b0e732b0 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -6,18 +6,17 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-//import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigWarning;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.joda.time.DateTime;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
@@ -71,21 +70,29 @@ public class VespaDocumentOperation extends EvalFunc<String> {
private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields";
private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields";
- private static final String ADD_TENSOR_FIELDS = "add-tensor-fields";
- private static final String REMOVE_BAG_AS_MAP_FIELDS = "remove-bag-as-map-fields";
- private static final String ADD_BAG_AS_MAP_FIELDS = "add-bag-as-map-fields";
+ private static final String UPDATE_TENSOR_FIELDS = "update-tensor-fields";
+ private static final String REMOVE_MAP_FIELDS = "remove-map-fields";
+ private static final String UPDATE_MAP_FIELDS = "update-map-fields";
private static final String EXCLUDE_FIELDS = "exclude-fields";
private static final String TESTSET_CONDITION = "condition";
private static final String PARTIAL_UPDATE_ASSIGN = "assign";
private static final String PARTIAL_UPDATE_ADD = "add";
private static final String PARTIAL_UPDATE_REMOVE = "remove";
- private static Map<String, String> operationMap;
+ private static Map<String, String> mapPartialOperationMap;
+ static {
+ mapPartialOperationMap = new HashMap<>();
+ mapPartialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
+ mapPartialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN);
+ }
+
+ private static Map<String, String> partialOperationMap;
static {
- operationMap = new HashMap<>();
- operationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE);
- operationMap.put(REMOVE_BAG_AS_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
- operationMap.put(ADD_TENSOR_FIELDS, PARTIAL_UPDATE_ADD);
+ partialOperationMap = new HashMap<>();
+ partialOperationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE);
+ partialOperationMap.put(UPDATE_TENSOR_FIELDS, PARTIAL_UPDATE_ADD);
+ partialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE);
+ partialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN);
}
private final String template;
@@ -149,19 +156,12 @@ public class VespaDocumentOperation extends EvalFunc<String> {
if (statusReporter != null) {
statusReporter.incrCounter("Vespa Document Operation Counters","Document operation failed",1);
}
- e.printStackTrace();
- Schema inputSchema = getInputSchema();
- Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple);
- String docId = TupleTools.toString(fields, template);
- System.out.println(docId);
- /*
StringBuilder sb = new StringBuilder();
sb.append("Caught exception processing input row: \n");
sb.append(tuple.toString());
sb.append("\nException: ");
sb.append(ExceptionUtils.getStackTrace(e));
warn(sb.toString(), PigWarning.UDF_WARNING_1);
- */
return null;
}
if (statusReporter != null) {
@@ -219,42 +219,35 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return out.toString();
}
+ private static String getPartialOperation(Map<String, String> operationMap, String name, Properties properties) {
+ // This function checks if the property of the name falls into the map provided
+ // if yes, return the desired operation. if no, return null
+ // for example, input:
+ // operationMap map{"update-map-fields":"assign","remove-map-fields":"remove"}
+ // name date
+ // properties "update-map-fields":"date,month"
+ // output: assign
+ for (String label: operationMap.keySet()) {
+ if (properties.getProperty(label) != null) {
+ String[] p = properties.getProperty(label).split(",");
+ if (Arrays.asList(p).contains(name)) {
+ return operationMap.get(label);
+ }
+ }
+ }
+ return null;
+ }
@SuppressWarnings("unchecked")
private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException {
if (shouldWriteField(name, properties, depth)) {
- if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties) ||
- isPartialOperation(ADD_BAG_AS_MAP_FIELDS, name, properties)){
- schema = (schema != null) ? schema.getField(0).schema : null;
- // extract the key of map and keys in map for writing json when partial updating maps
- Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
- // data format { ( key; id, value: (abc,123,(123234,bbaa))) }
- // the first element of each tuple in the bag will be the map to update
- // the second element of each tuple in the bag will be the new value of the map
- DataBag bag = (DataBag) value;
- for (Tuple element : bag) {
- if (element.size() != 2) {
- continue;
- }
- String k = (String) element.get(0);
- Object v = element.get(1);
- Byte t = DataType.findType(v);
- if (t == DataType.TUPLE) {
- g.writeFieldName(name + "{" + k + "}");
- if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties)) {
- g.writeStartObject();
- g.writeFieldName(PARTIAL_UPDATE_REMOVE);
- g.writeNumber(0);
- g.writeEndObject();
- }else{
- if (shouldWritePartialUpdate(op, depth)) {
- writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth);
- } else {
- writeValue(v, t, g, name, properties, valueSchema, op, depth);
- }
- }
- }
- }
+ String operation = getPartialOperation(mapPartialOperationMap, name, properties);
+ // check if the name has the property update-map-fields/remove-map-fields
+ // if yes, we need special treatments here as we need to loop through the tuple
+ // be aware the the operation here is not vespa operation such as "put" and "update"
+ // operation here are the field name we wish use to such as "assign" and "remove"
+ if (operation != null) {
+ writePartialUpdateAndRemoveMap(name ,value, g, properties, schema, op, depth, operation);
}else{
g.writeFieldName(name);
if (shouldWritePartialUpdate(op, depth)) {
@@ -267,6 +260,35 @@ public class VespaDocumentOperation extends EvalFunc<String> {
}
}
+ private static void writePartialUpdateAndRemoveMap(String name, Object value, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth, String operation) throws IOException {
+ schema = (schema != null) ? schema.getField(0).schema : null;
+ // extract the key of map and keys in map for writing json when partial updating maps
+ Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
+ // data format { ( key; id, value: (abc,123,(123234,bbaa))) }
+ // the first element of each tuple in the bag will be the map to update
+ // the second element of each tuple in the bag will be the new value of the map
+ DataBag bag = (DataBag) value;
+ for (Tuple element : bag) {
+ if (element.size() != 2) {
+ continue;
+ }
+ String k = (String) element.get(0);
+ Object v = element.get(1);
+ Byte t = DataType.findType(v);
+ if (t == DataType.TUPLE) {
+ g.writeFieldName(name + "{" + k + "}");
+ if (operation.equals(PARTIAL_UPDATE_REMOVE)) {
+ g.writeStartObject();
+ g.writeFieldName(PARTIAL_UPDATE_REMOVE);
+ g.writeNumber(0);
+ g.writeEndObject();
+ }else{
+ writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth);
+ }
+ }
+ }
+ }
+
@SuppressWarnings("unchecked")
private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
switch (type) {
@@ -312,7 +334,7 @@ public class VespaDocumentOperation extends EvalFunc<String> {
Map<Object, Object> map = (Map<Object, Object>) value;
if (shouldCreateTensor(map, name, properties)) {
if(isRemoveTensor(name,properties)){
- writeRemoveTensor(map,g);
+ writeRemoveTensor(map, g);
}else{
writeTensor(map, g);
}
@@ -387,19 +409,14 @@ public class VespaDocumentOperation extends EvalFunc<String> {
}
private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
- boolean isAssign = true;
-
g.writeStartObject();
- for (String label: operationMap.keySet()) {
- if (properties.getProperty(label) != null) {
- String[] p = properties.getProperty(label).split(",");
- if (Arrays.asList(p).contains(name)) {
- g.writeFieldName(operationMap.get(label));
- isAssign = false;
- }
- }
- }
- if (isAssign) {
+ // here we check if the operation falls into the four partial operations we do on map/tensor structure
+ // if no, we assume it's a update on the whole document and we write assign here
+ // if yes, we write the desired operation here
+ String operation = getPartialOperation(partialOperationMap, name, properties);
+ if (operation != null) {
+ g.writeFieldName(operation);
+ }else{
g.writeFieldName(PARTIAL_UPDATE_ASSIGN);
}
writeValue(value, type, g, name, properties, schema, op, depth);
@@ -440,12 +457,12 @@ public class VespaDocumentOperation extends EvalFunc<String> {
}
private static boolean shouldWriteTupleAsMap(String name, Properties properties) {
- // include ADD_BAG_AS_MAP_FIELDS here because when updating the map
+ // include UPDATE_MAP_FIELDS here because when updating the map
// the second element in each tuple should be written as a map
if (properties == null) {
return false;
}
- String addBagAsMapFields = properties.getProperty(ADD_BAG_AS_MAP_FIELDS);
+ String addBagAsMapFields = properties.getProperty(UPDATE_MAP_FIELDS);
String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS);
if (simpleObjectFields == null && addBagAsMapFields == null) {
return false;
@@ -501,7 +518,7 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return false;
}
String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS);
- String addTensorFields = properties.getProperty(ADD_TENSOR_FIELDS);
+ String addTensorFields = properties.getProperty(UPDATE_TENSOR_FIELDS);
String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS);
if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) {
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
index b86e927e914..72d0a2ec069 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.pig;
-import com.google.gson.JsonArray;
import org.apache.pig.data.*;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -120,7 +119,7 @@ public class VespaDocumentOperationTest {
addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple);
addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-bag-as-map-fields=bag","operation=update");
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-map-fields=bag","operation=update");
docOp.setInputSchema(schema);
String json = docOp.exec(tuple);
@@ -154,7 +153,7 @@ public class VespaDocumentOperationTest {
Tuple tuple = TupleFactory.getInstance().newTuple();
addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple);
addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "add-bag-as-map-fields=bag","operation=update");
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "update-map-fields=bag","operation=update");
docOp.setInputSchema(schema);
String json = docOp.exec(tuple);
@@ -184,7 +183,7 @@ public class VespaDocumentOperationTest {
addToTuple("id", DataType.CHARARRAY, "123", schema, tuple);
addToTuple("tensor", DataType.MAP, tensor, schema, tuple);
- VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "add-tensor-fields=tensor","operation=update");
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "update-tensor-fields=tensor","operation=update");
docOp.setInputSchema(schema);
String json = docOp.exec(tuple);
@@ -232,7 +231,7 @@ public class VespaDocumentOperationTest {
JsonNode fields = root.get("fields");
JsonNode tensorValue = fields.get("tensor");
JsonNode remove = tensorValue.get("remove");
- JsonNode address = remove.get("address");
+ JsonNode address = remove.get("addresses");
Iterator<JsonNode> addressIterator = address.getElements();