From d254707e6a9f5555321c108996cfd7b61c55f23d Mon Sep 17 00:00:00 2001 From: jingtinghuang Date: Tue, 31 Mar 2020 17:39:04 +0800 Subject: pr fix and add counter --- .../vespa/hadoop/pig/VespaDocumentOperation.java | 149 ++++++++++++--------- .../hadoop/pig/VespaDocumentOperationTest.java | 9 +- 2 files changed, 87 insertions(+), 71 deletions(-) (limited to 'vespa-hadoop/src') diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index b7c58fe968c..b81b0e732b0 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -6,18 +6,17 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -//import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.EvalFunc; import org.apache.pig.PigWarning; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.tools.pigstats.PigStatusReporter; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.joda.time.DateTime; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigDecimal; @@ -71,21 +70,29 @@ public class VespaDocumentOperation extends EvalFunc { private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields"; private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields"; private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields"; - private static final String ADD_TENSOR_FIELDS = "add-tensor-fields"; - private static final String REMOVE_BAG_AS_MAP_FIELDS = "remove-bag-as-map-fields"; - private static final String ADD_BAG_AS_MAP_FIELDS = "add-bag-as-map-fields"; + private static final String UPDATE_TENSOR_FIELDS = "update-tensor-fields"; + private static final String REMOVE_MAP_FIELDS = "remove-map-fields"; + private static final String UPDATE_MAP_FIELDS = "update-map-fields"; private static final String EXCLUDE_FIELDS = "exclude-fields"; private static final String TESTSET_CONDITION = "condition"; private static final String PARTIAL_UPDATE_ASSIGN = "assign"; private static final String PARTIAL_UPDATE_ADD = "add"; private static final String PARTIAL_UPDATE_REMOVE = "remove"; - private static Map operationMap; + private static Map mapPartialOperationMap; + static { + mapPartialOperationMap = new HashMap<>(); + mapPartialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); + mapPartialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); + } + + private static Map partialOperationMap; static { - operationMap = new HashMap<>(); - operationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE); - operationMap.put(REMOVE_BAG_AS_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); - operationMap.put(ADD_TENSOR_FIELDS, PARTIAL_UPDATE_ADD); + partialOperationMap = new HashMap<>(); + partialOperationMap.put(REMOVE_TENSOR_FIELDS, PARTIAL_UPDATE_REMOVE); + partialOperationMap.put(UPDATE_TENSOR_FIELDS, PARTIAL_UPDATE_ADD); + partialOperationMap.put(REMOVE_MAP_FIELDS, PARTIAL_UPDATE_REMOVE); + partialOperationMap.put(UPDATE_MAP_FIELDS, PARTIAL_UPDATE_ASSIGN); } private final String template; @@ -149,19 +156,12 @@ public class VespaDocumentOperation extends EvalFunc { if (statusReporter != null) { statusReporter.incrCounter("Vespa Document Operation Counters","Document operation failed",1); } - e.printStackTrace(); - Schema inputSchema = getInputSchema(); - Map fields = TupleTools.tupleMap(inputSchema, tuple); - String docId = TupleTools.toString(fields, template); - System.out.println(docId); - /* StringBuilder sb = new StringBuilder(); sb.append("Caught exception processing input row: \n"); sb.append(tuple.toString()); sb.append("\nException: "); sb.append(ExceptionUtils.getStackTrace(e)); warn(sb.toString(), PigWarning.UDF_WARNING_1); - */ return null; } if (statusReporter != null) { @@ -219,42 +219,35 @@ public class VespaDocumentOperation extends EvalFunc { return out.toString(); } + private static String getPartialOperation(Map operationMap, String name, Properties properties) { + // This function checks if the property of the name falls into the map provided + // if yes, return the desired operation. if no, return null + // for example, input: + // operationMap map{"update-map-fields":"assign","remove-map-fields":"remove"} + // name date + // properties "update-map-fields":"date,month" + // output: assign + for (String label: operationMap.keySet()) { + if (properties.getProperty(label) != null) { + String[] p = properties.getProperty(label).split(","); + if (Arrays.asList(p).contains(name)) { + return operationMap.get(label); + } + } + } + return null; + } @SuppressWarnings("unchecked") private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException { if (shouldWriteField(name, properties, depth)) { - if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties) || - isPartialOperation(ADD_BAG_AS_MAP_FIELDS, name, properties)){ - schema = (schema != null) ? schema.getField(0).schema : null; - // extract the key of map and keys in map for writing json when partial updating maps - Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; - // data format { ( key; id, value: (abc,123,(123234,bbaa))) } - // the first element of each tuple in the bag will be the map to update - // the second element of each tuple in the bag will be the new value of the map - DataBag bag = (DataBag) value; - for (Tuple element : bag) { - if (element.size() != 2) { - continue; - } - String k = (String) element.get(0); - Object v = element.get(1); - Byte t = DataType.findType(v); - if (t == DataType.TUPLE) { - g.writeFieldName(name + "{" + k + "}"); - if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties)) { - g.writeStartObject(); - g.writeFieldName(PARTIAL_UPDATE_REMOVE); - g.writeNumber(0); - g.writeEndObject(); - }else{ - if (shouldWritePartialUpdate(op, depth)) { - writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth); - } else { - writeValue(v, t, g, name, properties, valueSchema, op, depth); - } - } - } - } + String operation = getPartialOperation(mapPartialOperationMap, name, properties); + // check if the name has the property update-map-fields/remove-map-fields + // if yes, we need special treatments here as we need to loop through the tuple + // be aware the the operation here is not vespa operation such as "put" and "update" + // operation here are the field name we wish use to such as "assign" and "remove" + if (operation != null) { + writePartialUpdateAndRemoveMap(name ,value, g, properties, schema, op, depth, operation); }else{ g.writeFieldName(name); if (shouldWritePartialUpdate(op, depth)) { @@ -267,6 +260,35 @@ public class VespaDocumentOperation extends EvalFunc { } } + private static void writePartialUpdateAndRemoveMap(String name, Object value, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth, String operation) throws IOException { + schema = (schema != null) ? schema.getField(0).schema : null; + // extract the key of map and keys in map for writing json when partial updating maps + Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; + // data format { ( key; id, value: (abc,123,(123234,bbaa))) } + // the first element of each tuple in the bag will be the map to update + // the second element of each tuple in the bag will be the new value of the map + DataBag bag = (DataBag) value; + for (Tuple element : bag) { + if (element.size() != 2) { + continue; + } + String k = (String) element.get(0); + Object v = element.get(1); + Byte t = DataType.findType(v); + if (t == DataType.TUPLE) { + g.writeFieldName(name + "{" + k + "}"); + if (operation.equals(PARTIAL_UPDATE_REMOVE)) { + g.writeStartObject(); + g.writeFieldName(PARTIAL_UPDATE_REMOVE); + g.writeNumber(0); + g.writeEndObject(); + }else{ + writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth); + } + } + } + } + @SuppressWarnings("unchecked") private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { switch (type) { @@ -312,7 +334,7 @@ public class VespaDocumentOperation extends EvalFunc { Map map = (Map) value; if (shouldCreateTensor(map, name, properties)) { if(isRemoveTensor(name,properties)){ - writeRemoveTensor(map,g); + writeRemoveTensor(map, g); }else{ writeTensor(map, g); } @@ -387,19 +409,14 @@ public class VespaDocumentOperation extends EvalFunc { } private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { - boolean isAssign = true; - g.writeStartObject(); - for (String label: operationMap.keySet()) { - if (properties.getProperty(label) != null) { - String[] p = properties.getProperty(label).split(","); - if (Arrays.asList(p).contains(name)) { - g.writeFieldName(operationMap.get(label)); - isAssign = false; - } - } - } - if (isAssign) { + // here we check if the operation falls into the four partial operations we do on map/tensor structure + // if no, we assume it's a update on the whole document and we write assign here + // if yes, we write the desired operation here + String operation = getPartialOperation(partialOperationMap, name, properties); + if (operation != null) { + g.writeFieldName(operation); + }else{ g.writeFieldName(PARTIAL_UPDATE_ASSIGN); } writeValue(value, type, g, name, properties, schema, op, depth); @@ -440,12 +457,12 @@ public class VespaDocumentOperation extends EvalFunc { } private static boolean shouldWriteTupleAsMap(String name, Properties properties) { - // include ADD_BAG_AS_MAP_FIELDS here because when updating the map + // include UPDATE_MAP_FIELDS here because when updating the map // the second element in each tuple should be written as a map if (properties == null) { return false; } - String addBagAsMapFields = properties.getProperty(ADD_BAG_AS_MAP_FIELDS); + String addBagAsMapFields = properties.getProperty(UPDATE_MAP_FIELDS); String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS); if (simpleObjectFields == null && addBagAsMapFields == null) { return false; @@ -501,7 +518,7 @@ public class VespaDocumentOperation extends EvalFunc { return false; } String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); - String addTensorFields = properties.getProperty(ADD_TENSOR_FIELDS); + String addTensorFields = properties.getProperty(UPDATE_TENSOR_FIELDS); String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) { diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java index b86e927e914..72d0a2ec069 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.pig; -import com.google.gson.JsonArray; import org.apache.pig.data.*; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -120,7 +119,7 @@ public class VespaDocumentOperationTest { addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-bag-as-map-fields=bag","operation=update"); + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-map-fields=bag","operation=update"); docOp.setInputSchema(schema); String json = docOp.exec(tuple); @@ -154,7 +153,7 @@ public class VespaDocumentOperationTest { Tuple tuple = TupleFactory.getInstance().newTuple(); addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "add-bag-as-map-fields=bag","operation=update"); + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "update-map-fields=bag","operation=update"); docOp.setInputSchema(schema); String json = docOp.exec(tuple); @@ -184,7 +183,7 @@ public class VespaDocumentOperationTest { addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); addToTuple("tensor", DataType.MAP, tensor, schema, tuple); - VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "add-tensor-fields=tensor","operation=update"); + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "update-tensor-fields=tensor","operation=update"); docOp.setInputSchema(schema); String json = docOp.exec(tuple); @@ -232,7 +231,7 @@ public class VespaDocumentOperationTest { JsonNode fields = root.get("fields"); JsonNode tensorValue = fields.get("tensor"); JsonNode remove = tensorValue.get("remove"); - JsonNode address = remove.get("address"); + JsonNode address = remove.get("addresses"); Iterator addressIterator = address.getElements(); -- cgit v1.2.3