diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-24 17:10:28 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-24 17:10:28 +0200 |
commit | 150aff6805eaa1b23a72bad7b01b89cee72ea6cf (patch) | |
tree | cfadc02f181f8665976a001591bd134caf5fef97 /vespa-hadoop | |
parent | 2f22aff9a94ce547809715e2d4ef72dc61a11426 (diff) |
Implement dryrun in FeedClient, use it from CLI and Hadoop feeder
Diffstat (limited to 'vespa-hadoop')
-rw-r--r-- | vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java | 60 |
1 files changed, 13 insertions, 47 deletions
diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 84dbe919c5c..9bd285b7cca 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -1,15 +1,10 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce; -import ai.vespa.feed.client.DocumentId; -import ai.vespa.feed.client.DryrunResult; import ai.vespa.feed.client.FeedClient; import ai.vespa.feed.client.FeedClientBuilder; import ai.vespa.feed.client.JsonFeeder; import ai.vespa.feed.client.OperationParseException; -import ai.vespa.feed.client.OperationParameters; -import ai.vespa.feed.client.OperationStats; -import ai.vespa.feed.client.Result; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; import com.yahoo.vespa.http.client.config.FeedParams; @@ -21,7 +16,6 @@ import java.net.URI; import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Level; import java.util.logging.Logger; @@ -132,22 +126,19 @@ public class VespaRecordWriter extends RecordWriter<Object, Object> { } private FeedClient createFeedClient() { - if (config.dryrun()) { - return new DryrunClient(); - } else { - List<URI> endpoints = endpointUris(config); - log.info("Using endpoints " + endpoints); - int streamsPerConnection = streamsPerConnection(config); - log.log(Level.INFO, "Using {0} max streams per connection", new Object[] {streamsPerConnection}); - log.log(Level.INFO, "Using {0} connections", new Object[] {config.numConnections()}); - FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpoints) - .setConnectionsPerEndpoint(config.numConnections()) - .setMaxStreamPerConnection(streamsPerConnection) - .setRetryStrategy(retryStrategy(config)); - - onFeedClientInitialization(feedClientBuilder); - return feedClientBuilder.build(); - } + List<URI> endpoints = endpointUris(config); + log.info("Using endpoints " + endpoints); + int streamsPerConnection = streamsPerConnection(config); + log.log(Level.INFO, "Using {0} max streams per connection", new Object[] {streamsPerConnection}); + log.log(Level.INFO, "Using {0} connections", new Object[] {config.numConnections()}); + FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpoints) + .setConnectionsPerEndpoint(config.numConnections()) + .setMaxStreamPerConnection(streamsPerConnection) + .setDryrun(config.dryrun()) + .setRetryStrategy(retryStrategy(config)); + + onFeedClientInitialization(feedClientBuilder); + return feedClientBuilder.build(); } private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) { @@ -166,29 +157,4 @@ public class VespaRecordWriter extends RecordWriter<Object, Object> { .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort()))) .collect(toList()); } - - private static class DryrunClient implements FeedClient { - - @Override - public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override public OperationStats stats() { return null; } - @Override public void close(boolean graceful) {} - - private static CompletableFuture<Result> createSuccessResult(DocumentId documentId) { - return CompletableFuture.completedFuture(DryrunResult.create(Result.Type.success, documentId, "ok", null)); - } - } } |