summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java')
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java187
1 files changed, 187 insertions, 0 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
new file mode 100644
index 00000000000..d5000b2b328
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/pig/VespaStorage.java
@@ -0,0 +1,187 @@
+package com.yahoo.vespa.hadoop.pig;
+
+import com.yahoo.vespa.hadoop.mapreduce.VespaOutputFormat;
+import com.yahoo.vespa.hadoop.mapreduce.util.TupleTools;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+import java.io.*;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A small Pig UDF wrapper around the Vespa http client for
+ * feeding data into a Vespa endpoint.
+ *
+ * @author lesters
+ */
+@SuppressWarnings("rawtypes")
+public class VespaStorage extends StoreFunc {
+
+ private final boolean createDocOp;
+ private final String template;
+ private final VespaDocumentOperation.Operation operation;
+
+ private String signature = null;
+ private RecordWriter recordWriter = null;
+ private ResourceSchema resourceSchema = null;
+ private Properties properties = new Properties();
+
+ private static final String PROPERTY_CREATE_DOC_OP = "create-document-operation";
+ private static final String PROPERTY_ID_TEMPLATE = "docid";
+ private static final String PROPERTY_OPERATION = "operation";
+ private static final String PROPERTY_RESOURCE_SCHEMA = "resource_schema";
+
+ public VespaStorage() {
+ createDocOp = false;
+ template = null;
+ operation = null;
+ }
+
+ public VespaStorage(String... params) {
+ properties = VespaConfiguration.loadProperties(params);
+ createDocOp = Boolean.parseBoolean(properties.getProperty(PROPERTY_CREATE_DOC_OP, "false"));
+ operation = VespaDocumentOperation.Operation.fromString(properties.getProperty(PROPERTY_OPERATION, "put"));
+ template = properties.getProperty(PROPERTY_ID_TEMPLATE);
+ }
+
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return new VespaOutputFormat(properties);
+ }
+
+
+ @Override
+ public void setStoreLocation(String endpoint, Job job) throws IOException {
+ properties.setProperty(VespaConfiguration.ENDPOINT, endpoint);
+ }
+
+
+ @Override
+ public void prepareToWrite(RecordWriter recordWriter) throws IOException {
+ this.recordWriter = recordWriter;
+ this.resourceSchema = getResourceSchema();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return;
+ }
+
+ String data = null;
+ if (createDocOp) {
+ data = createDocumentOperation(tuple);
+ } else if (!tuple.isNull(0)) {
+ data = tuple.get(0).toString(); // assume single field with correctly formatted doc op.
+ }
+
+ if (data == null || data.length() == 0) {
+ return;
+ }
+
+ try {
+ recordWriter.write(0, data);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ @Override
+ public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+ setResourceSchema(resourceSchema);
+ }
+
+
+ @Override
+ public String relToAbsPathForStoreLocation(String endpoint, Path path) throws IOException {
+ return endpoint;
+ }
+
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String s) {
+ this.signature = s;
+ }
+
+
+ @Override
+ public void cleanupOnFailure(String s, Job job) throws IOException {
+ }
+
+
+ @Override
+ public void cleanupOnSuccess(String s, Job job) throws IOException {
+ }
+
+
+ private ResourceSchema getResourceSchema() throws IOException {
+ Properties properties = getUDFProperties();
+ return base64Deserialize(properties.getProperty(PROPERTY_RESOURCE_SCHEMA));
+ }
+
+
+ private void setResourceSchema(ResourceSchema schema) throws IOException {
+ Properties properties = getUDFProperties();
+ if (properties.getProperty(PROPERTY_RESOURCE_SCHEMA) == null) {
+ properties.setProperty(PROPERTY_RESOURCE_SCHEMA, base64Serialize(schema));
+ }
+ }
+
+
+ private Properties getUDFProperties() {
+ String[] context = { signature };
+ return UDFContext.getUDFContext().getUDFProperties(getClass(), context);
+ }
+
+
+ private String createDocumentOperation(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() == 0) {
+ return null;
+ }
+ if (resourceSchema == null) {
+ return null;
+ }
+
+ Map<String, Object> fields = TupleTools.tupleMap(resourceSchema, tuple);
+ String docId = TupleTools.toString(fields, template);
+
+ return VespaDocumentOperation.create(operation, docId, fields, properties);
+ }
+
+
+ public static String base64Serialize(Object o) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(o);
+ }
+ return Base64.getEncoder().encodeToString(baos.toByteArray());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Serializable> T base64Deserialize(String s) throws IOException {
+ Object ret;
+ byte[] data = Base64.getDecoder().decode(s);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+ ret = ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return (T) ret;
+ }
+
+}