From a3756480c7886ad029db6f2fbfaacb088d3ffee1 Mon Sep 17 00:00:00 2001 From: jhuang02 Date: Wed, 25 Mar 2020 10:38:43 +0800 Subject: add support for partial update tensor and map --- .../vespa/hadoop/pig/VespaDocumentOperation.java | 244 +++++++++++++++++---- .../hadoop/pig/VespaDocumentOperationTest.java | 173 ++++++++++++++- 2 files changed, 368 insertions(+), 49 deletions(-) diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index 32cdbf9af5c..9427ae962d1 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -68,10 +68,18 @@ public class VespaDocumentOperation extends EvalFunc { private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields"; private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields"; private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields"; + private static final String REMOVE_TENSOR_FIELDS = "remove-tensor-fields"; + private static final String ADD_TENSOR_FIELDS = "add-tensor-fields"; + private static final String REMOVE_BAG_AS_MAP_FIELDS = "remove-bag-as-map-fields"; + private static final String ADD_BAG_AS_MAP_FIELDS = "add-bag-as-map-fields"; private static final String EXCLUDE_FIELDS = "exclude-fields"; private static final String TESTSET_CONDITION = "condition"; + private static final String PARTIAL_UPDATE_ASSIGN = "assign"; + private static final String PARTIAL_UPDATE_ADD = "add"; + private static final String PARTIAL_UPDATE_REMOVE = "remove"; + private final String template; private final Operation operation; @@ -107,7 +115,7 @@ public class VespaDocumentOperation extends EvalFunc { Schema inputSchema = getInputSchema(); Map fields = TupleTools.tupleMap(inputSchema, tuple); String docId = TupleTools.toString(fields, template); - + System.out.println(docId); // create json json = create(operation, docId, fields, properties, inputSchema); if (json == null || json.length() == 0) { @@ -182,12 +190,47 @@ public class VespaDocumentOperation extends EvalFunc { @SuppressWarnings("unchecked") private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException { if (shouldWriteField(name, properties, depth)) { - g.writeFieldName(name); - if (shouldWritePartialUpdate(op, depth)) { - writePartialUpdate(value, type, g, name, properties, schema, op, depth); - } else { - writeValue(value, type, g, name, properties, schema, op, depth); + if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, false) || + isPartialOperation(ADD_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_ASSIGN, false)){ + schema = (schema != null) ? schema.getField(0).schema : null; + // extract the key of map and keys in map for writing json when partial updating maps + Schema valueSchema = (schema != null) ? schema.getField(1).schema : null; + // data format { ( key; id, value: (abc,123,(123234,bbaa))) } + // the first element of each tuple in the bag will be the map to update + // the second element of each tuple in the bag will be the new value of the map + DataBag bag = (DataBag) value; + for (Tuple element : bag) { + if (element.size() != 2) { + continue; + } + String k = (String) element.get(0); + Object v = element.get(1); + Byte t = DataType.findType(v); + if (t == DataType.TUPLE) { + g.writeFieldName(name + "{" + k + "}"); + if (isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, false)) { + g.writeStartObject(); + g.writeFieldName(PARTIAL_UPDATE_REMOVE); + g.writeNumber(0); + g.writeEndObject(); + }else{ + if (shouldWritePartialUpdate(op, depth)) { + writePartialUpdate(v, t, g, name, properties, valueSchema, op, depth+1); + } else { + writeValue(v, t, g, name, properties, valueSchema, op, depth+1); + } + } + } + } + }else{ + g.writeFieldName(name); + if (shouldWritePartialUpdate(op, depth)) { + writePartialUpdate(value, type, g, name, properties, schema, op, depth); + } else { + writeValue(value, type, g, name, properties, schema, op, depth); + } } + } } @@ -235,7 +278,7 @@ public class VespaDocumentOperation extends EvalFunc { g.writeStartObject(); Map map = (Map) value; if (shouldCreateTensor(map, name, properties)) { - writeTensor(map, g); + writeTensor(map, g, isRemoveTensor(name, properties)); } else { for (Map.Entry entry : map.entrySet()) { String k = entry.getKey().toString(); @@ -269,7 +312,6 @@ public class VespaDocumentOperation extends EvalFunc { DataBag bag = (DataBag) value; // get the schema of the tuple in bag schema = (schema != null) ? schema.getField(0).schema : null; - if (shouldWriteBagAsMap(name, properties)) { // when treating bag as map, the schema of bag should be {(key, val)....} // the size of tuple in bag should be 2. 1st one is key. 2nd one is val. @@ -285,9 +327,9 @@ public class VespaDocumentOperation extends EvalFunc { Byte t = DataType.findType(v); if (t == DataType.TUPLE) { Map fields = TupleTools.tupleMap(valueSchema, (Tuple) v); - writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth); + writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth+1); } else { - writeField(k, v, t, g, properties, valueSchema, op, depth); + writeField(k, v, t, g, properties, valueSchema, op, depth+1); } } g.writeEndObject(); @@ -309,9 +351,32 @@ public class VespaDocumentOperation extends EvalFunc { private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException { g.writeStartObject(); - g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation + // look up which operation to do by checking names and their respected properties + if (!isPartialOperation(REMOVE_TENSOR_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, true) + && !isPartialOperation(REMOVE_BAG_AS_MAP_FIELDS, name, properties, g, PARTIAL_UPDATE_REMOVE, true) + && !isPartialOperation(ADD_TENSOR_FIELDS, name, properties, g, PARTIAL_UPDATE_ADD, true)) { + g.writeFieldName(PARTIAL_UPDATE_ASSIGN); + } writeValue(value, type, g, name, properties, schema, op, depth); g.writeEndObject(); + + + } + + private static boolean isPartialOperation(String label, String name, Properties properties, JsonGenerator g, String targetOperation, boolean writeFieldName) throws IOException{ + // when dealing with partial update operations, write the desired operation + // writeFieldName decides if a field name should be written when checking + boolean isPartialOperation = false; + if (properties.getProperty(label) != null) { + String[] p = properties.getProperty(label).split(","); + if (Arrays.asList(p).contains(name)) { + if (writeFieldName) { + g.writeFieldName(targetOperation); + } + isPartialOperation = true; + } + } + return isPartialOperation; } private static boolean shouldWriteTupleStart(Tuple tuple, String name, Properties properties) { @@ -335,21 +400,38 @@ public class VespaDocumentOperation extends EvalFunc { } private static boolean shouldWriteTupleAsMap(String name, Properties properties) { + // include ADD_BAG_AS_MAP_FIELDS here because when updating the map + // the second element in each tuple should be written as a map if (properties == null) { return false; } + String addBagAsMapFields = properties.getProperty(ADD_BAG_AS_MAP_FIELDS); String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS); - if (simpleObjectFields == null) { + if (simpleObjectFields == null && addBagAsMapFields == null) { return false; } - if (simpleObjectFields.equals("*")) { - return true; + if (addBagAsMapFields != null){ + if (addBagAsMapFields.equals("*")) { + return true; + } + String[] fields = addBagAsMapFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } - String[] fields = simpleObjectFields.split(","); - for (String field : fields) { - if (field.trim().equalsIgnoreCase(name)) { + if(simpleObjectFields != null){ + if (simpleObjectFields.equals("*")) { return true; } + String[] fields = simpleObjectFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } } return false; } @@ -378,11 +460,50 @@ public class VespaDocumentOperation extends EvalFunc { if (properties == null) { return false; } - String tensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); - if (tensorFields == null) { + String createTensorFields = properties.getProperty(CREATE_TENSOR_FIELDS); + String addTensorFields = properties.getProperty(ADD_TENSOR_FIELDS); + String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); + + if (createTensorFields == null && addTensorFields == null && removeTensorFields == null) { return false; } - String[] fields = tensorFields.split(","); + String[] fields; + if (createTensorFields != null) { + fields = createTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + if (addTensorFields != null) { + fields = addTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + if (removeTensorFields != null) { + fields = removeTensorFields.split(","); + for (String field : fields) { + if (field.trim().equalsIgnoreCase(name)) { + return true; + } + } + } + return false; + } + + private static boolean isRemoveTensor(String name, Properties properties){ + if (properties == null) { + return false; + } + String removeTensorFields = properties.getProperty(REMOVE_TENSOR_FIELDS); + if (removeTensorFields == null) { + return false; + } + String[] fields = removeTensorFields.split(","); for (String field : fields) { if (field.trim().equalsIgnoreCase(name)) { return true; @@ -408,45 +529,72 @@ public class VespaDocumentOperation extends EvalFunc { return true; } - private static void writeTensor(Map map, JsonGenerator g) throws IOException { - g.writeFieldName("cells"); + private static void writeTensor(Map map, JsonGenerator g, Boolean isRemoveTensor) throws IOException { + if (!isRemoveTensor){ + g.writeFieldName("cells"); + }else{ + g.writeFieldName("address"); + } g.writeStartArray(); for (Map.Entry entry : map.entrySet()) { String k = entry.getKey().toString(); Double v = Double.parseDouble(entry.getValue().toString()); - g.writeStartObject(); - // Write address - g.writeFieldName("address"); - g.writeStartObject(); + if (!isRemoveTensor){ - String[] dimensions = k.split(","); - for (String dimension : dimensions) { - if (dimension == null || dimension.isEmpty()) { - continue; - } - String[] address = dimension.split(":"); - if (address.length != 2) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); - } - String dim = address[0]; - String label = address[1]; - if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { - throw new IllegalArgumentException("Malformed cell address: " + dimension); + g.writeStartObject(); + + g.writeFieldName("address"); + g.writeStartObject(); + + String[] dimensions = k.split(","); + for (String dimension : dimensions) { + if (dimension == null || dimension.isEmpty()) { + continue; + } + String[] address = dimension.split(":"); + if (address.length != 2) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + String dim = address[0]; + String label = address[1]; + if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + g.writeFieldName(dim.trim()); + g.writeString(label.trim()); } - g.writeFieldName(dim.trim()); - g.writeString(label.trim()); - } - g.writeEndObject(); + g.writeEndObject(); - // Write value - g.writeFieldName("value"); - g.writeNumber(v); + // Write value + g.writeFieldName("value"); + g.writeNumber(v); - g.writeEndObject(); + g.writeEndObject(); + + }else{ + String[] dimensions = k.split(","); + for (String dimension : dimensions) { + g.writeStartObject(); + if (dimension == null || dimension.isEmpty()) { + continue; + } + String[] address = dimension.split(":"); + if (address.length != 2) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + String dim = address[0]; + String label = address[1]; + if (dim == null || label == null || dim.isEmpty() || label.isEmpty()) { + throw new IllegalArgumentException("Malformed cell address: " + dimension); + } + g.writeFieldName(dim.trim()); + g.writeString(label.trim()); + g.writeEndObject(); + } + } } g.writeEndArray(); } - } diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java index 3c6805019b8..b86e927e914 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.pig; +import com.google.gson.JsonArray; import org.apache.pig.data.*; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -10,6 +11,7 @@ import org.junit.Test; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -18,7 +20,6 @@ import static org.junit.Assert.assertNull; @SuppressWarnings("serial") public class VespaDocumentOperationTest { - @Test public void requireThatUDFReturnsCorrectJson() throws Exception { String json = getDocumentOperationJson("docid=id::metrics::-"); @@ -84,6 +85,170 @@ public class VespaDocumentOperationTest { } + @Test + public void requireThatUDFCorrectlyGeneratesRemoveBagAsMapOperation() throws Exception { + + DataBag bag = BagFactory.getInstance().newDefaultBag(); + + Schema innerObjectSchema = new Schema(); + Tuple innerObjectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); + addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); + + Schema objectSchema = new Schema(); + Tuple objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "234566", objectSchema, objectTuple); + addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); + + Schema bagSchema = new Schema(); + addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); + + innerObjectSchema = new Schema(); + innerObjectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); + addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); + + objectSchema = new Schema(); + objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); + addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); + + addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "remove-bag-as-map-fields=bag","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + assertEquals("{\"remove\":0}", fields.get("bag{123456}").toString()); + assertEquals("{\"remove\":0}", fields.get("bag{234566}").toString()); + + } + + @Test + public void requireThatUDFCorrectlyGeneratesAddBagAsMapOperation() throws Exception { + + DataBag bag = BagFactory.getInstance().newDefaultBag(); + + Schema innerObjectSchema = new Schema(); + Tuple innerObjectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("year", DataType.CHARARRAY, "2020", innerObjectSchema, innerObjectTuple); + addToTuple("month", DataType.INTEGER, 3, innerObjectSchema, innerObjectTuple); + + Schema objectSchema = new Schema(); + Tuple objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); + addToTuple("value", DataType.TUPLE, innerObjectTuple,innerObjectSchema,objectSchema, objectTuple); + + Schema bagSchema = new Schema(); + addToBagWithSchema("firstLayerTuple",DataType.TUPLE,objectTuple,objectSchema,bagSchema,bag); + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + addToTuple("bag", DataType.BAG, bag, bagSchema, schema, tuple); + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=id", "add-bag-as-map-fields=bag","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + + JsonNode fields = root.get("fields"); + JsonNode value = fields.get("bag{123456}"); + JsonNode assign = value.get("assign"); + assertEquals("2020", assign.get("year").getTextValue()); + assertEquals(3, assign.get("month").getIntValue()); + } + + @Test + public void requireThatUDFCorrectlyGeneratesAddTensorOperation() throws Exception { + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + // Please refer to the tensor format documentation + + Map tensor = new HashMap() {{ + put("x:label1,y:label2,z:label4", 2.0); + put("x:label3", 3.0); + }}; + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("tensor", DataType.MAP, tensor, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "add-tensor-fields=tensor","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode tensorValue = fields.get("tensor"); + JsonNode add = tensorValue.get("add"); + JsonNode cells = add.get("cells"); + Iterator cellsIterator = cells.getElements(); + + JsonNode element = cellsIterator.next(); + assertEquals("label1", element.get("address").get("x").getTextValue()); + assertEquals("label2", element.get("address").get("y").getTextValue()); + assertEquals("label4", element.get("address").get("z").getTextValue()); + assertEquals("2.0", element.get("value").toString()); + + element = cellsIterator.next(); + assertEquals("label3", element.get("address").get("x").getTextValue()); + assertEquals("3.0", element.get("value").toString()); + } + + @Test + public void requireThatUDFCorrectlyGeneratesRemoveTensorOperation() throws Exception { + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + + // Please refer to the tensor format documentation + + Map tensor = new HashMap() {{ + put("x:label1,y:label2,z:label4", 2.0); + put("x:label3", 3.0); + }}; + + addToTuple("id", DataType.CHARARRAY, "123", schema, tuple); + addToTuple("tensor", DataType.MAP, tensor, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "remove-tensor-fields=tensor","operation=update"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + ObjectMapper m = new ObjectMapper(); + JsonNode root = m.readTree(json); + JsonNode fields = root.get("fields"); + JsonNode tensorValue = fields.get("tensor"); + JsonNode remove = tensorValue.get("remove"); + JsonNode address = remove.get("address"); + + Iterator addressIterator = address.getElements(); + + JsonNode element = addressIterator.next(); + assertEquals("label1", element.get("x").getTextValue()); + + element = addressIterator.next(); + assertEquals("label2", element.get("y").getTextValue()); + + element = addressIterator.next(); + assertEquals("label4", element.get("z").getTextValue()); + + element = addressIterator.next(); + assertEquals("label3", element.get("x").getTextValue()); + } + @Test public void requireThatUDFCorrectlyGeneratesRemoveOperation() throws Exception { String json = getDocumentOperationJson("operation=remove", "docid=id::metrics::-"); @@ -368,4 +533,10 @@ public class VespaDocumentOperationTest { schema.add(new Schema.FieldSchema(alias, schemaInField, type)); tuple.append(value); } + + private void addToBagWithSchema(String alias, byte type, Tuple value, Schema schemaInField, Schema schema,DataBag bag) + throws FrontendException { + schema.add(new Schema.FieldSchema(alias, schemaInField, type)); + bag.add(value); + } } -- cgit v1.2.3