diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-04 17:22:45 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-10 16:45:37 +0200 |
commit | 95fad5a7d6f010e56290491109e219f69df4185b (patch) | |
tree | 525330c5472e45a9485c7893b6c0998de8bb39f2 /vespa-hadoop | |
parent | 627e960f3261f0d8fbcfcebeab9fb05d5a1c1400 (diff) |
Add new record writer based on vespa-feed-client
Diffstat (limited to 'vespa-hadoop')
4 files changed, 171 insertions, 11 deletions
diff --git a/vespa-hadoop/pom.xml b/vespa-hadoop/pom.xml index 382c28dc884..5fa2e4aa6e8 100644 --- a/vespa-hadoop/pom.xml +++ b/vespa-hadoop/pom.xml @@ -112,6 +112,12 @@ <artifactId>vespa-http-client</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>vespa-feed-client</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> </dependencies> diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java index 8b320fc9436..b716c55beb5 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/LegacyVespaRecordWriter.java @@ -36,7 +36,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; /** - * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints. + * {@link LegacyVespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-http-client. * * @author lesters */ diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java new file mode 100644 index 00000000000..540f0aecf24 --- /dev/null +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -0,0 +1,156 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hadoop.mapreduce; + +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +<<<<<<< HEAD +import ai.vespa.feed.client.JsonStreamFeeder; +======= +import ai.vespa.feed.client.JsonFeeder; +import ai.vespa.feed.client.JsonParseException; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.Result; +>>>>>>> fadb8fc527 (fixup! Add new record writer based on vespa-feed-client) +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import com.yahoo.vespa.http.client.config.FeedParams; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.logging.Logger; + +import static java.util.stream.Collectors.toList; + +/** + * {@link VespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-feed-client. + * + * @author bjorncs + */ +public class VespaRecordWriter extends RecordWriter<Object, Object> { + + private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName()); + + private final VespaCounters counters; + private final VespaConfiguration config; + + private boolean initialized = false; + private JsonFeeder feeder; + + protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) { + this.counters = counters; + this.config = config; + } + + @Override + public void write(Object key, Object data) throws IOException { + initializeOnFirstWrite(); + String json = data.toString().trim(); + feeder.feedSingle(json) + .whenComplete((result, error) -> { + if (error != null) { + if (error instanceof JsonParseException) { + counters.incrementDocumentsSkipped(1); + } else { + log.warning("Failed to feed single document: " + error); + counters.incrementDocumentsFailed(1); + } + } else { + counters.incrementDocumentsOk(1); + } + }); + counters.incrementDocumentsSent(1); + if (counters.getDocumentsSent() % config.progressInterval() == 0) { + String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)", + counters.getDocumentsSent(), + counters.getDocumentsOk(), + counters.getDocumentsFailed(), + counters.getDocumentsSkipped()); + log.info(progress); + } + } + + @Override + public void close(TaskAttemptContext context) throws IOException { + if (feeder != null) { + feeder.close(); + feeder = null; + initialized = false; + } + } + + /** Override method to alter {@link FeedClient} configuration */ + protected void onFeedClientInitialization(FeedClientBuilder builder) {} + + private void initializeOnFirstWrite() { + if (initialized) return; + validateConfig(); + useRandomizedStartupDelayIfEnabled(); + feeder = createJsonStreamFeeder(); + initialized = true; + } + + private void validateConfig() { + if (!config.useSSL()) { + throw new IllegalArgumentException("SSL is required for this feed client implementation"); + } + if (config.dryrun()) { + throw new IllegalArgumentException("Dryrun is not supported for this feed client implementation"); + } + if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { + throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); + } + if (config.proxyHost() != null) { + log.warning(String.format("Ignoring proxy config (host='%s', port=%d)", config.proxyHost(), config.proxyPort())); + } + } + + private void useRandomizedStartupDelayIfEnabled() { + if (config.randomStartupSleepMs() > 0) { + int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); + log.info("Delaying startup by " + delay + " ms"); + try { + Thread.sleep(delay); + } catch (Exception e) {} + } + } + + private JsonFeeder createJsonStreamFeeder() { + FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config)) + .setConnectionsPerEndpoint(config.numConnections()) + .setMaxStreamPerConnection(streamsPerConnection(config)) + .setRetryStrategy(retryStrategy(config)); + + onFeedClientInitialization(feedClientBuilder); + FeedClient feedClient = feedClientBuilder.build(); + JsonFeeder.Builder builder = JsonFeeder.builder(feedClient) + .withTimeout(Duration.ofMinutes(10)); + if (config.route() != null) { + builder.withRoute(config.route()); + } + return builder.build(); + } + + private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) { + int maxRetries = config.numRetries(); + return new FeedClient.RetryStrategy() { + @Override public int retries() { return maxRetries; } + }; + } + + private static int streamsPerConnection(VespaConfiguration config) { + return Math.min(256, config.maxInFlightRequests() / config.numConnections()); + } + + private static List<URI> endpointUris(VespaConfiguration config) { + return Arrays.stream(config.endpoint().split(",")) + .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort()))) + .collect(toList()); + } +} diff --git a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java index 7ca401a0cc8..c22d94d4bfc 100644 --- a/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java +++ b/vespa-hadoop/src/test/java/com/yahoo/vespa/hadoop/pig/VespaStorageTest.java @@ -1,14 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.pig; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - +import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; +import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.mapred.Counters; @@ -20,8 +14,12 @@ import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.junit.Test; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; -import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class VespaStorageTest { |