diff options
Diffstat (limited to 'vespa-hadoop')
-rw-r--r-- | vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java | 9 | ||||
-rw-r--r-- | vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java | 44 |
2 files changed, 53 insertions, 0 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java index 94176bbb658..f433ec9a35e 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperation.java @@ -67,6 +67,7 @@ public class VespaDocumentOperation extends EvalFunc<String> { private static final String PROPERTY_CREATE_IF_NON_EXISTENT = "create-if-non-existent"; private static final String PROPERTY_ID_TEMPLATE = "docid"; private static final String PROPERTY_OPERATION = "operation"; + private static final String PROPERTY_PRINT_DOCID_WHEN_ERROR = "print-docid-when-error"; private static final String BAG_AS_MAP_FIELDS = "bag-as-map-fields"; private static final String SIMPLE_ARRAY_FIELDS = "simple-array-fields"; private static final String SIMPLE_OBJECT_FIELDS = "simple-object-fields"; @@ -80,6 +81,7 @@ public class VespaDocumentOperation extends EvalFunc<String> { private static final String PARTIAL_UPDATE_ASSIGN = "assign"; private static final String PARTIAL_UPDATE_ADD = "add"; private static final String PARTIAL_UPDATE_REMOVE = "remove"; + private static boolean printIdOnError; private static Map<String, String> mapPartialOperationMap; @@ -113,6 +115,7 @@ public class VespaDocumentOperation extends EvalFunc<String> { properties = VespaConfiguration.loadProperties(params); template = properties.getProperty(PROPERTY_ID_TEMPLATE); operation = Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put")); + printIdOnError = Boolean.parseBoolean(properties.getProperty(PROPERTY_PRINT_DOCID_WHEN_ERROR, "false")); } @Override @@ -157,6 +160,12 @@ public class VespaDocumentOperation extends EvalFunc<String> { if (statusReporter != null) { statusReporter.incrCounter("Vespa Document Operation Counters", "Document operation failed", 1); } + if (printIdOnError) { + Schema inputSchema = getInputSchema(); + Map<String, Object> fields = TupleTools.tupleMap(inputSchema, tuple); + String docId = TupleTools.toString(fields, template); + System.out.println("Error occur when processing document with docID: " +docId); + } StringBuilder sb = new StringBuilder(); sb.append("Caught exception processing input row: \n"); sb.append(tuple.toString()); diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java index 67003273cac..31f68807736 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaDocumentOperationTest.java @@ -6,9 +6,14 @@ import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -16,9 +21,22 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; @SuppressWarnings("serial") public class VespaDocumentOperationTest { + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + + @Before + public void setUpStreams() { + System.setOut(new PrintStream(outContent)); + } + + @After + public void restoreStreams() { + System.setOut(originalOut); + } @Test public void requireThatUDFReturnsCorrectJson() throws Exception { String json = getDocumentOperationJson("docid=id:<application>:metrics::<name>-<date>"); @@ -539,6 +557,32 @@ public class VespaDocumentOperationTest { assertEquals(234567, bagNode.get("234567").asInt()); } + @Test + public void requireThatUDFPrintIdWhenError() throws IOException { + DataBag bag = BagFactory.getInstance().newDefaultBag(); + + Schema objectSchema = new Schema(); + Tuple objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "123456", objectSchema, objectTuple); + addToTuple("value", DataType.INTEGER, 123456, objectSchema, objectTuple); + bag.add(objectTuple); + + objectSchema = new Schema(); + objectTuple = TupleFactory.getInstance().newTuple(); + addToTuple("key", DataType.CHARARRAY, "234567", objectSchema, objectTuple); + addToTuple("value", DataType.INTEGER, 234567, objectSchema, objectTuple); + bag.add(objectTuple); + + Schema schema = new Schema(); + Tuple tuple = TupleFactory.getInstance().newTuple(); + addToTuple("bag", DataType.BAG, bag, objectSchema, schema, tuple); + + VespaDocumentOperation docOp = new VespaDocumentOperation("docid=7654321", "simple-object-fields=bag","print-docid-when-error=true"); + docOp.setInputSchema(schema); + String json = docOp.exec(tuple); + + assertThat(outContent.toString(), CoreMatchers.containsString("Error occur when processing document with docID: 7654321")); + } private void addToTuple(String alias, byte type, Object value, Schema schema, Tuple tuple) { schema.add(new Schema.FieldSchema(alias, type)); |