From f4de7094036cb8d2b1b93822f16e08b60afd5b22 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Fri, 4 Jun 2021 17:36:32 +0200 Subject: Add dryrun implementation of FeedClient --- .../java/ai/vespa/feed/client/DryrunResult.java | 18 +++++++ .../vespa/hadoop/mapreduce/VespaRecordWriter.java | 63 ++++++++++++++++------ 2 files changed, 65 insertions(+), 16 deletions(-) create mode 100644 vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java (limited to 'vespa-hadoop/src') diff --git a/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java new file mode 100644 index 00000000000..5974a8df271 --- /dev/null +++ b/vespa-hadoop/src/main/java/ai/vespa/feed/client/DryrunResult.java @@ -0,0 +1,18 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.Result.Type; + +/** + * Workaround for package-private {@link Result} constructor. + * + * @author bjorncs + */ +public class DryrunResult { + + private DryrunResult() {} + + public static Result create(Type type, DocumentId documentId, String resultMessage, String traceMessage) { + return new Result(type, documentId, resultMessage, traceMessage); + } +} diff --git a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java index 540f0aecf24..73e10c39419 100644 --- a/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java +++ b/vespa-hadoop/src/main/java/com/yahoo/vespa/hadoop/mapreduce/VespaRecordWriter.java @@ -1,17 +1,15 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hadoop.mapreduce; +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.DryrunResult; import ai.vespa.feed.client.FeedClient; import ai.vespa.feed.client.FeedClientBuilder; -<<<<<<< HEAD -import ai.vespa.feed.client.JsonStreamFeeder; -======= import ai.vespa.feed.client.JsonFeeder; import ai.vespa.feed.client.JsonParseException; import ai.vespa.feed.client.OperationParameters; import ai.vespa.feed.client.OperationStats; import ai.vespa.feed.client.Result; ->>>>>>> fadb8fc527 (fixup! Add new record writer based on vespa-feed-client) import com.yahoo.vespa.hadoop.mapreduce.util.VespaConfiguration; import com.yahoo.vespa.hadoop.mapreduce.util.VespaCounters; import com.yahoo.vespa.http.client.config.FeedParams; @@ -23,6 +21,7 @@ import java.net.URI; import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; @@ -100,9 +99,6 @@ public class VespaRecordWriter extends RecordWriter { if (!config.useSSL()) { throw new IllegalArgumentException("SSL is required for this feed client implementation"); } - if (config.dryrun()) { - throw new IllegalArgumentException("Dryrun is not supported for this feed client implementation"); - } if (config.dataFormat() != FeedParams.DataFormat.JSON_UTF8) { throw new IllegalArgumentException("Only JSON is support by this feed client implementation"); } @@ -112,7 +108,7 @@ public class VespaRecordWriter extends RecordWriter { } private void useRandomizedStartupDelayIfEnabled() { - if (config.randomStartupSleepMs() > 0) { + if (!config.dryrun() && config.randomStartupSleepMs() > 0) { int delay = ThreadLocalRandom.current().nextInt(config.randomStartupSleepMs()); log.info("Delaying startup by " + delay + " ms"); try { @@ -121,20 +117,30 @@ public class VespaRecordWriter extends RecordWriter { } } + private JsonFeeder createJsonStreamFeeder() { - FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config)) - .setConnectionsPerEndpoint(config.numConnections()) - .setMaxStreamPerConnection(streamsPerConnection(config)) - .setRetryStrategy(retryStrategy(config)); - - onFeedClientInitialization(feedClientBuilder); - FeedClient feedClient = feedClientBuilder.build(); - JsonFeeder.Builder builder = JsonFeeder.builder(feedClient) + FeedClient feedClient = createFeedClient(); + JsonFeeder.Builder builder = JsonFeeder.builder(feedClient) .withTimeout(Duration.ofMinutes(10)); if (config.route() != null) { builder.withRoute(config.route()); } return builder.build(); + + } + + private FeedClient createFeedClient() { + if (config.dryrun()) { + return new DryrunClient(); + } else { + FeedClientBuilder feedClientBuilder = FeedClientBuilder.create(endpointUris(config)) + .setConnectionsPerEndpoint(config.numConnections()) + .setMaxStreamPerConnection(streamsPerConnection(config)) + .setRetryStrategy(retryStrategy(config)); + + onFeedClientInitialization(feedClientBuilder); + return feedClientBuilder.build(); + } } private static FeedClient.RetryStrategy retryStrategy(VespaConfiguration config) { @@ -153,4 +159,29 @@ public class VespaRecordWriter extends RecordWriter { .map(hostname -> URI.create(String.format("https://%s:%d/", hostname, config.defaultPort()))) .collect(toList()); } + + private static class DryrunClient implements FeedClient { + + @Override + public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture remove(DocumentId documentId, OperationParameters params) { + return createSuccessResult(documentId); + } + + @Override public OperationStats stats() { return null; } + @Override public void close(boolean graceful) {} + + private static CompletableFuture createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(DryrunResult.create(Result.Type.success, documentId, "ok", null)); + } + } } -- cgit v1.2.3