summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-04 17:22:45 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-10 16:45:37 +0200
commit95fad5a7d6f010e56290491109e219f69df4185b (patch)
tree525330c5472e45a9485c7893b6c0998de8bb39f2 /vespa-hadoop
parent627e960f3261f0d8fbcfcebeab9fb05d5a1c1400 (diff)
Add new record writer based on vespa-feed-client
Diffstat (limited to 'vespa-hadoop')
-rw-r--r--vespa-hadoop/pom.xml6
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java2
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java156
-rw-r--r--vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java18
4 files changed, 171 insertions, 11 deletions
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml
index 382c28dc884..5fa2e4aa6e8 100644
--- a/vespa-hadoop/pom.xml
+++ b/vespa-hadoop/pom.xml
@@ -112,6 +112,12 @@
<artifactId>vespa-http-client</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>vespa-feed-client</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
index 8b320fc9436..b716c55beb5 100644
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java
@@ -36,7 +36,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Logger;
/**
- * {@link LegacyVespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints.
+ * {@link LegacyVespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-http-client.
*
* @author lesters
*/
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
new file mode 100644
index 00000000000..540f0aecf24
--- /dev/null
+++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
@@ -0,0 +1,156 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hadoop.mapreduce;
+
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+<<<<<<< HEAD
+import ai.vespa.feed.client.JsonStreamFeeder;
+=======
+import ai.vespa.feed.client.JsonFeeder;
+import ai.vespa.feed.client.JsonParseException;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.OperationStats;
+import ai.vespa.feed.client.Result;
+>>>>>>> fadb8fc527 (fixup! Add new record writer based on vespa-feed-client)
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import com.yahoo.vespa.http.client.config.FeedParams;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.logging.Logger;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * {@link VespaRecordWriter} sends the output &lt;key, value&gt; to one or more Vespa endpoints using vespa-feed-client.
+ *
+ * @author bjorncs
+ */
+public class VespaRecordWriter extends RecordWriter<Object, Object> {
+
+ private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
+
+ private final VespaCounters counters;
+ private final VespaConfiguration config;
+
+ private boolean initialized = false;
+ private JsonFeeder feeder;
+
+ protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) {
+ this.counters = counters;
+ this.config = config;
+ }
+
+ @Override
+ public void write(Object key, Object data) throws IOException {
+ initializeOnFirstWrite();
+ String json = data.toString().trim();
+ feeder.feedSingle(json)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ if (error instanceof JsonParseException) {
+ counters.incrementDocumentsSkipped(1);
+ } else {
+ log.warning("Failed to feed single document: " + error);
+ counters.incrementDocumentsFailed(1);
+ }
+ } else {
+ counters.incrementDocumentsOk(1);
+ }
+ });
+ counters.incrementDocumentsSent(1);
+ if (counters.getDocumentsSent() % config.progressInterval() == 0) {
+ String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
+ counters.getDocumentsSent(),
+ counters.getDocumentsOk(),
+ counters.getDocumentsFailed(),
+ counters.getDocumentsSkipped());
+ log.info(progress);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ if (feeder != null) {
+ feeder.close();
+ feeder = null;
+ initialized = false;
+ }
+ }
+
+ /** Override method to alter {@link FeedClient} configuration */
+ protected void onFeedClientInitialization(FeedClientBuilder builder) {}
+
+ private void initializeOnFirstWrite() {
+ if (initialized) return;
+ validateConfig();
+ useRandomizedStartupDelayIfEnabled();
+ feeder = createJsonStreamFeeder();
+ initialized = true;
+ }
+
+ private void validateConfig() {
+ if (!config.useSSL()) {
+ throw new IllegalArgumentException("SSL is required for this feed client implementation");
+ }
+ if (config.dryrun()) {
+ throw new IllegalArgumentException("Dryrun is not supported for this feed client implementation");
+ }
+ if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) {
+ throw new IllegalArgumentException("Only JSON is support by this feed client implementation");
+ }
+ if (config.proxyHost() != null) {
+ log.warning(String.format("Ignoring proxy config (host='%s', port=%d)", config.proxyHost(), config.proxyPort()));
+ }
+ }
+
+ private void useRandomizedStartupDelayIfEnabled() {
+ if (config.randomStartupSleepMs() > 0) {
+ int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs());
+ log.info("Delaying startup by " + delay + " ms");
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e) {}
+ }
+ }
+
+ private JsonFeeder createJsonStreamFeeder() {
+ FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config))
+ .setConnectionsPerEndpoint(config.numConnections())
+ .setMaxStreamPerConnection(streamsPerConnection(config))
+ .setRetryStrategy(retryStrategy(config));
+
+ onFeedClientInitialization(feedClientBuilder);
+ FeedClient feedClient = feedClientBuilder.build();
+ JsonFeeder.Builder builder = JsonFeeder.builder(feedClient)
+ .withTimeout(Duration.ofMinutes(10));
+ if (config.route() != null) {
+ builder.withRoute(config.route());
+ }
+ return builder.build();
+ }
+
+ private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) {
+ int maxRetries = config.numRetries();
+ return new FeedClient.RetryStrategy() {
+ @Override public int retries() { return maxRetries; }
+ };
+ }
+
+ private static int streamsPerConnection(VespaConfiguration config) {
+ return Math.min(256, config.maxInFlightRequests() / config.numConnections());
+ }
+
+ private static List<URI> endpointUris(VespaConfiguration config) {
+ return Arrays.stream(config.endpoint().split(","))
+ .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort())))
+ .collect(toList());
+ }
+}
diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
index 7ca401a0cc8..c22d94d4bfc 100644
--- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
+++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java
@@ -1,14 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hadoop.pig;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
+import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.mapred.Counters;
@@ -20,8 +14,12 @@ import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.Test;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
public class VespaStorageTest {