From 150aff6805eaa1b23a72bad7b01b89cee72ea6cf Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Thu, 24 Jun 2021 17:10:28 +0200 Subject: Implement dryrun in FeedClient, use it from CLI and Hadoop feeder --- .../vespa/hadoop/mapreduce/VespaRecordWriter.java | 60 +++++----------------- 1 file changed, 13 insertions(+), 47 deletions(-) (limited to 'vespa-hadoop/src/main') diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 84dbe919c5c..9bd285b7cca 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -1,15 +1,10 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce; -import ai.vespa.feed.client.DocumentId; -import ai.vespa.feed.client.DryrunResult; import ai.vespa.feed.client.FeedClient; import ai.vespa.feed.client.FeedClientBuilder; import ai.vespa.feed.client.JsonFeeder; import ai.vespa.feed.client.OperationParseException; -import ai.vespa.feed.client.OperationParameters; -import ai.vespa.feed.client.OperationStats; -import ai.vespa.feed.client.Result; import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; import com.yahoo.vespa.http.client.config.FeedParams; @@ -21,7 +16,6 @@ import java.net.URI; import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Level; import java.util.logging.Logger; @@ -132,22 +126,19 @@ public class VespaRecordWriter extends RecordWriter { } private FeedClient createFeedClient() { - if (config.dryrun()) { - return new DryrunClient(); - } else { - List endpoints = endpointUris(config); - log.info("Using endpoints " + endpoints); - int streamsPerConnection = streamsPerConnection(config); - log.log(Level.INFO, "Using {0} max streams per connection", new Object[] {streamsPerConnection}); - log.log(Level.INFO, "Using {0} connections", new Object[] {config.numConnections()}); - FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpoints) - .setConnectionsPerEndpoint(config.numConnections()) - .setMaxStreamPerConnection(streamsPerConnection) - .setRetryStrategy(retryStrategy(config)); - - onFeedClientInitialization(feedClientBuilder); - return feedClientBuilder.build(); - } + List endpoints = endpointUris(config); + log.info("Using endpoints " + endpoints); + int streamsPerConnection = streamsPerConnection(config); + log.log(Level.INFO, "Using {0} max streams per connection", new Object[] {streamsPerConnection}); + log.log(Level.INFO, "Using {0} connections", new Object[] {config.numConnections()}); + FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpoints) + .setConnectionsPerEndpoint(config.numConnections()) + .setMaxStreamPerConnection(streamsPerConnection) + .setDryrun(config.dryrun()) + .setRetryStrategy(retryStrategy(config)); + + onFeedClientInitialization(feedClientBuilder); + return feedClientBuilder.build(); } private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) { @@ -166,29 +157,4 @@ public class VespaRecordWriter extends RecordWriter { .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort()))) .collect(toList()); } - - private static class DryrunClient implements FeedClient { - - @Override - public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture remove(DocumentId documentId, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override public OperationStats stats() { return null; } - @Override public void close(boolean graceful) {} - - private static CompletableFuture createSuccessResult(DocumentId documentId) { - return CompletableFuture.completedFuture(DryrunResult.create(Result.Type.success, documentId, "ok", null)); - } - } } -- cgit v1.2.3