summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLester Solbakken <lesters@users.noreply.github.com>2020-01-07 10:49:29 +0100
committerGitHub <noreply@github.com>2020-01-07 10:49:29 +0100
commita63642e0c917fb323f53377d6644ef1126cad9eb (patch)
treeec513e555933c72987458f014f31f322d3b55899
parent07ede356ca51b412fa893387fc607e37832d3d27 (diff)
parentd8aec148b3d23783337af05d2f579667615e0d92 (diff)
Merge pull request #11617 from leisureshadow/vespa-doc-op-improve
Vespa doc op feature add
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java131
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java4
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java83
3 files changed, 193 insertions, 25 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
index 7e16979056c..fa463a77923 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java
@@ -13,6 +13,7 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.joda.time.DateTime;
import java.io.ByteArrayOutputStream;
@@ -60,9 +61,12 @@ public class VespaDocumentOperation extends EvalFunc<String> {
}
+ private static final String PROPERTY_CREATE_IF_NON_EXISTENT = "create-if-non-existent";
private static final String PROPERTY_ID_TEMPLATE = "docid";
private static final String PROPERTY_OPERATION = "operation";
+ private static final String BAG_AS_MAP_FIELDS = "bag-as-map-fields";
private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields";
+ private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields";
private static final String CREATE_TENSOR_FIELDS = "create-tensor-fields";
private static final String EXCLUDE_FIELDS = "exclude-fields";
@@ -99,11 +103,12 @@ public class VespaDocumentOperation extends EvalFunc<String> {
reporter.progress();
}
- Map<String, Object> fields = TupleTools.tupleMap(getInputSchema(), tuple);
+ Schema inputSchema = getInputSchema();
+ Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple);
String docId = TupleTools.toString(fields, template);
// create json
- json = create(operation, docId, fields, properties);
+ json = create(operation, docId, fields, properties, inputSchema);
if (json == null || json.length() == 0) {
warn("No valid document operation could be created.", PigWarning.UDF_WARNING_1);
return null;
@@ -134,7 +139,8 @@ public class VespaDocumentOperation extends EvalFunc<String> {
* @return A valid JSON Vespa document operation
* @throws IOException ...
*/
- public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties) throws IOException {
+ public static String create(Operation op, String docId, Map<String, Object> fields, Properties properties,
+ Schema schema) throws IOException {
if (op == null) {
return null;
}
@@ -152,8 +158,13 @@ public class VespaDocumentOperation extends EvalFunc<String> {
g.writeStringField(op.toString(), docId);
+ boolean createIfNonExistent = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_IF_NON_EXISTENT, "false"));
+ if (op == Operation.UPDATE && createIfNonExistent) {
+ writeField("create", true, DataType.BOOLEAN, g, properties, schema, op, 0);
+ }
+
if (op != Operation.REMOVE) {
- writeField("fields", fields, DataType.MAP, g, properties, op, 0);
+ writeField("fields", fields, DataType.MAP, g, properties, schema, op, 0);
}
g.writeEndObject();
@@ -164,19 +175,19 @@ public class VespaDocumentOperation extends EvalFunc<String> {
@SuppressWarnings("unchecked")
- private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Operation op, int depth) throws IOException {
+ private static void writeField(String name, Object value, Byte type, JsonGenerator g, Properties properties, Schema schema, Operation op, int depth) throws IOException {
if (shouldWriteField(name, properties, depth)) {
g.writeFieldName(name);
if (shouldWritePartialUpdate(op, depth)) {
- writePartialUpdate(value, type, g, name, properties, op, depth);
+ writePartialUpdate(value, type, g, name, properties, schema, op, depth);
} else {
- writeValue(value, type, g, name, properties, op, depth);
+ writeValue(value, type, g, name, properties, schema, op, depth);
}
}
}
@SuppressWarnings("unchecked")
- private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ private static void writeValue(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
switch (type) {
case DataType.UNKNOWN:
break;
@@ -225,31 +236,63 @@ public class VespaDocumentOperation extends EvalFunc<String> {
String k = entry.getKey().toString();
Object v = entry.getValue();
Byte t = DataType.findType(v);
- writeField(k, v, t, g, properties, op, depth+1);
+ Schema fieldSchema = (schema != null) ? schema.getField(k).schema : null;
+ writeField(k, v, t, g, properties, fieldSchema, op, depth+1);
}
}
g.writeEndObject();
break;
case DataType.TUPLE:
Tuple tuple = (Tuple) value;
- boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
- if (writeStartArray) {
- g.writeStartArray();
- }
- for (Object v : tuple) {
- writeValue(v, DataType.findType(v), g, name, properties, op, depth);
- }
- if (writeStartArray) {
- g.writeEndArray();
+ if (shouldWriteTupleAsMap(name, properties)) {
+ Map<String, Object> fields = TupleTools.tupleMap(schema, tuple);
+ writeValue(fields, DataType.MAP, g, name, properties, schema, op, depth);
+ } else {
+ boolean writeStartArray = shouldWriteTupleStart(tuple, name, properties);
+ if (writeStartArray) {
+ g.writeStartArray();
+ }
+ for (Object v : tuple) {
+ writeValue(v, DataType.findType(v), g, name, properties, schema, op, depth);
+ }
+ if (writeStartArray) {
+ g.writeEndArray();
+ }
}
break;
case DataType.BAG:
DataBag bag = (DataBag) value;
- g.writeStartArray();
- for (Tuple t : bag) {
- writeValue(t, DataType.TUPLE, g, name, properties, op, depth);
+ // get the schema of the tuple in bag
+ schema = (schema != null) ? schema.getField(0).schema : null;
+
+ if (shouldWriteBagAsMap(name, properties)) {
+ // when treating bag as map, the schema of bag should be {(key, val)....}
+ // the size of tuple in bag should be 2. 1st one is key. 2nd one is val.
+ Schema valueSchema = (schema != null) ? schema.getField(1).schema : null;
+
+ g.writeStartObject();
+ for (Tuple element : bag) {
+ if (element.size() != 2) {
+ continue;
+ }
+ String k = (String) element.get(0);
+ Object v = element.get(1);
+ Byte t = DataType.findType(v);
+ if (t == DataType.TUPLE) {
+ Map<String, Object> fields = TupleTools.tupleMap(valueSchema, (Tuple) v);
+ writeField(k, fields, DataType.MAP, g, properties, valueSchema, op, depth);
+ } else {
+ writeField(k, v, t, g, properties, valueSchema, op, depth);
+ }
+ }
+ g.writeEndObject();
+ } else {
+ g.writeStartArray();
+ for (Tuple t : bag) {
+ writeValue(t, DataType.TUPLE, g, name, properties, schema, op, depth);
+ }
+ g.writeEndArray();
}
- g.writeEndArray();
break;
}
@@ -259,10 +302,10 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return op == Operation.UPDATE && depth == 1;
}
- private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Operation op, int depth) throws IOException {
+ private static void writePartialUpdate(Object value, Byte type, JsonGenerator g, String name, Properties properties, Schema schema, Operation op, int depth) throws IOException {
g.writeStartObject();
g.writeFieldName(PARTIAL_UPDATE_ASSIGN); // TODO: lookup field name in a property to determine correct operation
- writeValue(value, type, g, name, properties, op, depth);
+ writeValue(value, type, g, name, properties, schema, op, depth);
g.writeEndObject();
}
@@ -286,6 +329,46 @@ public class VespaDocumentOperation extends EvalFunc<String> {
return true;
}
+ private static boolean shouldWriteTupleAsMap(String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String simpleObjectFields = properties.getProperty(SIMPLE_OBJECT_FIELDS);
+ if (simpleObjectFields == null) {
+ return false;
+ }
+ if (simpleObjectFields.equals("*")) {
+ return true;
+ }
+ String[] fields = simpleObjectFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean shouldWriteBagAsMap(String name, Properties properties) {
+ if (properties == null) {
+ return false;
+ }
+ String bagAsMapFields = properties.getProperty(BAG_AS_MAP_FIELDS);
+ if (bagAsMapFields == null) {
+ return false;
+ }
+ if (bagAsMapFields.equals("*")) {
+ return true;
+ }
+ String[] fields = bagAsMapFields.split(",");
+ for (String field : fields) {
+ if (field.trim().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private static boolean shouldCreateTensor(Map<Object, Object> map, String name, Properties properties) {
if (properties == null) {
return false;
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
index f0283636725..0ef90214f41 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
@@ -11,6 +11,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import java.io.*;
@@ -160,7 +161,8 @@ public class VespaStorage extends StoreFunc {
Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple);
String docId = TupleTools.toString(fields, template);
- return VespaDocumentOperation.create(operation, docId, fields, properties);
+ Schema schema = Schema.getPigSchema(resourceSchema);
+ return VespaDocumentOperation.create(operation, docId, fields, properties, schema);
}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
index 6ff13f5c4fa..7d0fe72fc64 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java
@@ -2,6 +2,7 @@
package com.yahoo.vespa.hadoop.pig;
import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
@@ -48,6 +49,22 @@ public class VespaDocumentOperationTest {
@Test
+ public void requireThatUDFSupportsCreateIfNonExistent() throws IOException {
+ String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>", "operation=update",
+ "create-if-non-existent=true");
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.path("fields");
+
+ assertEquals("id:testapp:metrics::clicks-20160112", root.get("update").getTextValue());
+ assertEquals(true, root.get("create").getBooleanValue());
+ assertEquals("testapp", fields.get("application").get("assign").getTextValue());
+ assertEquals("clicks", fields.get("name").get("assign").getTextValue());
+ assertEquals(3, fields.get("value").get("assign").getIntValue());
+ }
+
+
+ @Test
public void requireThatUDFReturnsNullForMissingConfig() throws Exception {
String json = getDocumentOperationJson();
assertNull(json);
@@ -266,10 +283,76 @@ public class VespaDocumentOperationTest {
}
+ @Test
+ public void requireThatUDFSupportsSimpleObjectFields() throws IOException {
+ Schema objectSchema = new Schema();
+ Tuple objectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("id", DataType.LONG, 123456789L, objectSchema, objectTuple);
+ addToTuple("url", DataType.CHARARRAY, "example.com", objectSchema, objectTuple);
+ addToTuple("value", DataType.INTEGER, 123, objectSchema, objectTuple);
+
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+ addToTuple("object", DataType.TUPLE, objectTuple, objectSchema, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "simple-object-fields=object");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode objectNode = fields.get("object");
+
+ assertEquals(123456789L, objectNode.get("id").asLong());
+ assertEquals("example.com", objectNode.get("url").asText());
+ assertEquals(123, objectNode.get("value").asInt());
+ }
+
+
+ @Test
+ public void requireThatUDFSupportsBagAsMapFields() throws IOException {
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+
+ Schema objectSchema = new Schema();
+ Tuple objectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple);
+ addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple);
+ bag.add(objectTuple);
+
+ objectSchema = new Schema();
+ objectTuple = TupleFactory.getInstance().newTuple();
+ addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple);
+ addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple);
+ bag.add(objectTuple);
+
+ Schema schema = new Schema();
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+ addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple);
+
+ VespaDocumentOperation docOp = new VespaDocumentOperation("docid=empty", "bag-as-map-fields=bag");
+ docOp.setInputSchema(schema);
+ String json = docOp.exec(tuple);
+
+ ObjectMapper m = new ObjectMapper();
+ JsonNode root = m.readTree(json);
+ JsonNode fields = root.get("fields");
+ JsonNode bagNode = fields.get("bag");
+
+ assertEquals(123456, bagNode.get("123456").asInt());
+ assertEquals(234567, bagNode.get("234567").asInt());
+ }
+
+
private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) {
schema.add(new Schema.FieldSchema(alias, type));
tuple.append(value);
}
+ private void addToTuple(String alias, byte type, Object value, Schema schemaInField, Schema schema, Tuple tuple)
+ throws FrontendException {
+ schema.add(new Schema.FieldSchema(alias, schemaInField, type));
+ tuple.append(value);
+ }
}