summaryrefslogtreecommitdiffstats
path: root/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java')
-rw-r--r--vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java153
1 files changed, 0 insertions, 153 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
deleted file mode 100644
index c450d7cdeef..00000000000
--- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hadoop.mapreduce;
-
-import ai.vespa.feed.client.FeedClient;
-import ai.vespa.feed.client.FeedClientBuilder;
-import ai.vespa.feed.client.JsonFeeder;
-import ai.vespa.feed.client.OperationParseException;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration;
-import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-import java.net.URI;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * {@link VespaRecordWriter} sends the output <key, value> to one or more Vespa endpoints using vespa-feed-client.
- *
- * @author bjorncs
- */
-public class VespaRecordWriter extends RecordWriter<Object, Object> {
-
- private final static Logger log = Logger.getLogger(VespaRecordWriter.class.getCanonicalName());
-
- private final VespaCounters counters;
- private final VespaConfiguration config;
-
- private boolean initialized = false;
- private JsonFeeder feeder;
-
- protected VespaRecordWriter(VespaConfiguration config, VespaCounters counters) {
- this.counters = counters;
- this.config = config;
- }
-
- @Override
- public void write(Object key, Object data) throws IOException {
- initializeOnFirstWrite();
- String json = data.toString().trim();
- feeder.feedSingle(json)
- .whenComplete((result, error) -> {
- if (error != null) {
- if (error instanceof OperationParseException) {
- counters.incrementDocumentsSkipped(1);
- } else {
- String msg = "Failed to feed single document: " + error;
- log.log(Level.WARNING, msg, error);
- counters.incrementDocumentsFailed(1);
- }
- } else {
- counters.incrementDocumentsOk(1);
- }
- });
- counters.incrementDocumentsSent(1);
- if (counters.getDocumentsSent() % config.progressInterval() == 0) {
- String progress = String.format("Feed progress: %d / %d / %d / %d (sent, ok, failed, skipped)",
- counters.getDocumentsSent(),
- counters.getDocumentsOk(),
- counters.getDocumentsFailed(),
- counters.getDocumentsSkipped());
- log.info(progress);
- }
- }
-
- @Override
- public void close(TaskAttemptContext context) throws IOException {
- if (feeder != null) {
- feeder.close();
- feeder = null;
- initialized = false;
- }
- }
-
- /** Override method to alter {@link FeedClient} configuration */
- protected void onFeedClientInitialization(FeedClientBuilder builder) {}
-
- private void initializeOnFirstWrite() {
- if (initialized) return;
- useRandomizedStartupDelayIfEnabled();
- feeder = createJsonStreamFeeder();
- initialized = true;
- }
-
- private void useRandomizedStartupDelayIfEnabled() {
- if (!config.dryrun() && config.randomStartupSleepMs() > 0) {
- int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs());
- log.info("Delaying startup by " + delay + " ms");
- try {
- Thread.sleep(delay);
- } catch (Exception e) {}
- }
- }
-
-
- private JsonFeeder createJsonStreamFeeder() {
- FeedClient feedClient = createFeedClient();
- JsonFeeder.Builder builder = JsonFeeder.builder(feedClient)
- .withTimeout(Duration.ofMinutes(10));
- if (config.route() != null) {
- builder.withRoute(config.route());
- }
- return builder.build();
-
- }
-
- private FeedClient createFeedClient() {
- List<URI> endpoints = endpointUris(config);
- log.info("Using endpoints " + endpoints);
- int streamsPerConnection = streamsPerConnection(config);
- log.log(Level.INFO, "Using {0} max streams per connection", new Object[] {streamsPerConnection});
- log.log(Level.INFO, "Using {0} connections", new Object[] {config.numConnections()});
- FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpoints)
- .setConnectionsPerEndpoint(config.numConnections())
- .setMaxStreamPerConnection(streamsPerConnection)
- .setDryrun(config.dryrun())
- .setRetryStrategy(retryStrategy(config));
- if (config.proxyHost() != null) {
- URI proxyUri = URI.create(String.format(
- "%s://%s:%d", config.proxyScheme(), config.proxyHost(), config.proxyPort()));
- log.info("Using proxy " + proxyUri);
- feedClientBuilder.setProxy(proxyUri);
- }
-
- onFeedClientInitialization(feedClientBuilder);
- return feedClientBuilder.build();
- }
-
- private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) {
- int maxRetries = config.numRetries();
- return new FeedClient.RetryStrategy() {
- @Override public int retries() { return maxRetries; }
- };
- }
-
- private static int streamsPerConnection(VespaConfiguration config) {
- return Math.min(256, config.maxInFlightRequests() / config.numConnections());
- }
-
- private static List<URI> endpointUris(VespaConfiguration config) {
- String scheme = config.useSSL().orElse(true) ? "https" : "http";
- return Arrays.stream(config.endpoint().split(","))
- .map(hostname -> URI.create(String.format("%s://%s:%d/", scheme, hostname, config.defaultPort())))
- .collect(toList());
- }
-}