diff options
7 files changed, 206 insertions, 52 deletions
diff --git a/vespa-feed-client-cli/pom.xml b/vespa-feed-client-cli/pom.xml index 62ff5c149ec..28e1ab01dae 100644 --- a/vespa-feed-client-cli/pom.xml +++ b/vespa-feed-client-cli/pom.xml @@ -31,6 +31,11 @@ <artifactId>commons-cli</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <scope>compile</scope> + </dependency> <!-- test scope --> <dependency> diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java index 33c3d0e7894..8855b08b06b 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java @@ -16,10 +16,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Path; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.OptionalDouble; import java.util.OptionalInt; /** @@ -42,11 +44,17 @@ class CliArguments { private static final String HELP_OPTION = "help"; private static final String MAX_STREAMS_PER_CONNECTION = "max-streams-per-connection"; private static final String PRIVATE_KEY_OPTION = "private-key"; + private static final String ROUTE_OPTION = "route"; + private static final String TIMEOUT_OPTION = "timeout"; + private static final String TRACE_OPTION = "trace"; + private static final String VERBOSE_OPTION = "verbose"; private static final String VERSION_OPTION = "version"; + private static final String STDIN_OPTION = "stdin"; private final CommandLine arguments; - private CliArguments(CommandLine arguments) { + private CliArguments(CommandLine arguments) throws CliArgumentsException { + validateArgumentCombination(arguments); this.arguments = arguments; } @@ -59,11 +67,26 @@ class CliArguments { } } + private static void validateArgumentCombination(CommandLine args) throws CliArgumentsException { + if (!args.hasOption(HELP_OPTION) && !args.hasOption(VERSION_OPTION)) { + if (!args.hasOption(ENDPOINT_OPTION)) { + throw new CliArgumentsException("Endpoint must be specified"); + } + if (args.hasOption(FILE_OPTION) == args.hasOption(STDIN_OPTION)) { + throw new CliArgumentsException(String.format("Either option '%s' or '%s' must be specified", FILE_OPTION, STDIN_OPTION)); + } + if (args.hasOption(CERTIFICATE_OPTION) != args.hasOption(PRIVATE_KEY_OPTION)) { + throw new CliArgumentsException( + String.format("Both '%s' and '%s' must be specified together", CERTIFICATE_OPTION, PRIVATE_KEY_OPTION)); + } + } else if (args.hasOption(HELP_OPTION) && args.hasOption(VERSION_OPTION)) { + throw new CliArgumentsException(String.format("Cannot specify both '%s' and '%s'", HELP_OPTION, VERSION_OPTION)); + } + } + URI endpoint() throws CliArgumentsException { try { - URL url = (URL) arguments.getParsedOptionValue(ENDPOINT_OPTION); - if (url == null) throw new CliArgumentsException("Endpoint must be specified"); - return url.toURI(); + return ((URL) arguments.getParsedOptionValue(ENDPOINT_OPTION)).toURI(); } catch (ParseException | URISyntaxException e) { throw new CliArgumentsException("Invalid endpoint: " + e.getMessage(), e); } @@ -80,18 +103,14 @@ class CliArguments { Optional<CertificateAndKey> certificateAndKey() throws CliArgumentsException { Path certificateFile = fileValue(CERTIFICATE_OPTION).orElse(null); Path privateKeyFile = fileValue(PRIVATE_KEY_OPTION).orElse(null); - if ((certificateFile == null) != (privateKeyFile == null)) { - throw new CliArgumentsException(String.format("Both '%s' and '%s' must be specified together", CERTIFICATE_OPTION, PRIVATE_KEY_OPTION)); - } if (privateKeyFile == null && certificateFile == null) return Optional.empty(); return Optional.of(new CertificateAndKey(certificateFile, privateKeyFile)); } Optional<Path> caCertificates() throws CliArgumentsException { return fileValue(CA_CERTIFICATES_OPTION); } - Path inputFile() throws CliArgumentsException { - return fileValue(FILE_OPTION) - .orElseThrow(() -> new CliArgumentsException("Feed file must be specified")); + Optional<Path> inputFile() throws CliArgumentsException { + return fileValue(FILE_OPTION); } Map<String, String> headers() throws CliArgumentsException { @@ -116,12 +135,27 @@ class CliArguments { boolean benchmarkModeEnabled() { return has(BENCHMARK_OPTION); } + Optional<String> route() { return stringValue(ROUTE_OPTION); } + + OptionalInt traceLevel() throws CliArgumentsException { return intValue(TRACE_OPTION); } + + Optional<Duration> timeout() throws CliArgumentsException { + OptionalDouble timeout = doubleValue(TIMEOUT_OPTION); + return timeout.isPresent() + ? Optional.of(Duration.ofMillis((long)(timeout.getAsDouble()*1000))) + : Optional.empty(); + } + + boolean verboseSpecified() { return has(VERBOSE_OPTION); } + + boolean readFeedFromStandardInput() { return has(STDIN_OPTION); } + private OptionalInt intValue(String option) throws CliArgumentsException { try { Number number = (Number) arguments.getParsedOptionValue(option); return number != null ? OptionalInt.of(number.intValue()) : OptionalInt.empty(); } catch (ParseException e) { - throw new CliArgumentsException(String.format("Invalid value for '%s': %s", option, e.getMessage()), e); + throw newInvalidValueException(option, e); } } @@ -131,12 +165,27 @@ class CliArguments { if (certificateFile == null) return Optional.empty(); return Optional.of(certificateFile.toPath()); } catch (ParseException e) { - throw new CliArgumentsException(String.format("Invalid value for '%s': %s", option, e.getMessage()), e); + throw newInvalidValueException(option, e); + } + } + + private Optional<String> stringValue(String option) { return Optional.ofNullable(arguments.getOptionValue(option)); } + + private OptionalDouble doubleValue(String option) throws CliArgumentsException { + try { + Number number = (Number) arguments.getParsedOptionValue(option); + return number != null ? OptionalDouble.of(number.doubleValue()) : OptionalDouble.empty(); + } catch (ParseException e) { + throw newInvalidValueException(option, e); } } private boolean has(String option) { return arguments.hasOption(option); } + private static CliArgumentsException newInvalidValueException(String option, ParseException cause) { + return new CliArgumentsException(String.format("Invalid value for '%s': %s", option, cause.getMessage()), cause); + } + private static Options createOptions() { // TODO Add description to each option return new Options() @@ -171,11 +220,6 @@ class CliArguments { .type(Number.class) .build()) .addOption(Option.builder() - .longOpt(CONNECTIONS_OPTION) - .hasArg() - .type(Number.class) - .build()) - .addOption(Option.builder() .longOpt(CERTIFICATE_OPTION) .type(File.class) .hasArg() @@ -195,6 +239,26 @@ class CliArguments { .build()) .addOption(Option.builder() .longOpt(BENCHMARK_OPTION) + .build()) + .addOption(Option.builder() + .longOpt(ROUTE_OPTION) + .hasArg() + .build()) + .addOption(Option.builder() + .longOpt(TIMEOUT_OPTION) + .hasArg() + .type(Number.class) + .build()) + .addOption(Option.builder() + .longOpt(TRACE_OPTION) + .hasArg() + .type(Number.class) + .build()) + .addOption(Option.builder() + .longOpt(STDIN_OPTION) + .build()) + .addOption(Option.builder() + .longOpt(VERBOSE_OPTION) .build()); } diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index 27ecada98c3..f30d44ba4f3 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -1,12 +1,16 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; + import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLSession; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; -import java.nio.file.Path; +import java.nio.file.Files; import java.util.Properties; /** @@ -35,8 +39,9 @@ public class CliClient { } private int run(String[] rawArgs) { + CliArguments cliArgs = null; try { - CliArguments cliArgs = CliArguments.fromRawArgs(rawArgs); + cliArgs = CliArguments.fromRawArgs(rawArgs); if (cliArgs.helpSpecified()) { cliArgs.printHelp(systemOut); return 0; @@ -45,41 +50,67 @@ public class CliClient { systemOut.println(Vespa.VERSION); return 0; } - FeedClient feedClient = createFeedClient(cliArgs); + try (InputStream in = createFeedInputStream(cliArgs); + JsonStreamFeeder feeder = createJsonFeeder(cliArgs)) { + if (cliArgs.benchmarkModeEnabled()) { + printBenchmarkResult(feeder.benchmark(in)); + } else { + feeder.feed(in); + } + } return 0; } catch (CliArguments.CliArgumentsException | IOException e) { - return handleException(e); + boolean verbose = cliArgs != null && cliArgs.verboseSpecified(); + return handleException(verbose, e); + } catch (Exception e) { + boolean verbose = cliArgs != null && cliArgs.verboseSpecified(); + return handleException(verbose, "Unknown failure: " + e.getMessage(), e); } } - private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { + private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException { FeedClientBuilder builder = FeedClientBuilder.create(cliArgs.endpoint()); cliArgs.connections().ifPresent(builder::setMaxConnections); cliArgs.maxStreamsPerConnection().ifPresent(builder::setMaxConnections); if (cliArgs.sslHostnameVerificationDisabled()) { builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE); } - CliArguments.CertificateAndKey certificateAndKey = cliArgs.certificateAndKey().orElse(null); - Path caCertificates = cliArgs.caCertificates().orElse(null); - if (certificateAndKey != null || caCertificates != null) { - SslContextBuilder sslContextBuilder = new SslContextBuilder(); - if (certificateAndKey != null) { - sslContextBuilder.withCertificateAndKey(certificateAndKey.certificateFile, certificateAndKey.privateKeyFile); - } - if (caCertificates != null) { - sslContextBuilder.withCaCertificates(caCertificates); - } - builder.setSslContext(sslContextBuilder.build()); - } + cliArgs.certificateAndKey().ifPresent(c -> builder.setCertificate(c.certificateFile, c.privateKeyFile)); + cliArgs.caCertificates().ifPresent(builder::setCaCertificates); cliArgs.headers().forEach(builder::addRequestHeader); return builder.build(); } - private int handleException(Exception e) { return handleException(e.getMessage(), e); } + private static JsonStreamFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { + FeedClient feedClient = createFeedClient(cliArgs); + JsonStreamFeeder.Builder builder = JsonStreamFeeder.builder(feedClient); + cliArgs.timeout().ifPresent(builder::withTimeout); + cliArgs.route().ifPresent(builder::withRoute); + cliArgs.traceLevel().ifPresent(builder::withTracelevel); + return builder.build(); + } + + private InputStream createFeedInputStream(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { + return cliArgs.readFeedFromStandardInput() ? systemIn : Files.newInputStream(cliArgs.inputFile().get()); + } + + private void printBenchmarkResult(JsonStreamFeeder.BenchmarkResult result) throws IOException { + JsonFactory factory = new JsonFactory(); + try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) { + generator.writeStartObject(); + generator.writeNumberField("feeder.runtime", result.duration.toMillis()); + generator.writeNumberField("feeder.okcount", result.okCount); + generator.writeNumberField("feeder.errorcount", result.errorCount); + generator.writeNumberField("feeder.throughput", result.throughput); + generator.writeEndObject(); + } + } + + private int handleException(boolean verbose, Exception e) { return handleException(verbose, e.getMessage(), e); } - private int handleException(String message, Exception exception) { + private int handleException(boolean verbose, String message, Exception exception) { systemError.println(message); - if (debugMode()) { + if (debugMode() || verbose) { exception.printStackTrace(systemError); } return 1; diff --git a/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java b/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java index 33ee31ff0dc..a99399e638d 100644 --- a/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java +++ b/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import static org.junit.jupiter.api.Assertions.*; @@ -21,9 +22,10 @@ class CliArgumentsTest { "--endpoint=https://vespa.ai:4443/", "--file=feed.json", "--connections=10", "--max-streams-per-connection=128", "--certificate=cert.pem", "--private-key=key.pem", "--ca-certificates=ca-certs.pem", "--disable-ssl-hostname-verification", - "--header=\"My-Header: my-value\"", "--header", "Another-Header: another-value", "--benchmark"}); + "--header=\"My-Header: my-value\"", "--header", "Another-Header: another-value", "--benchmark", + "--route=myroute", "--timeout=0.125", "--trace=9", "--verbose"}); assertEquals(URI.create("https://vespa.ai:4443/"), args.endpoint()); - assertEquals(Paths.get("feed.json"), args.inputFile()); + assertEquals(Paths.get("feed.json"), args.inputFile().get()); assertEquals(10, args.connections().getAsInt()); assertEquals(128, args.maxStreamsPerConnection().getAsInt()); assertEquals(Paths.get("cert.pem"), args.certificateAndKey().get().certificateFile); @@ -36,15 +38,31 @@ class CliArgumentsTest { assertEquals("my-value", args.headers().get("My-Header")); assertEquals("another-value", args.headers().get("Another-Header")); assertTrue(args.benchmarkModeEnabled()); + assertEquals("myroute", args.route().get()); + assertEquals(Duration.ofMillis(125), args.timeout().get()); + assertEquals(9, args.traceLevel().getAsInt()); + assertTrue(args.verboseSpecified()); } @Test - void fails_on_missing_parameters() throws CliArguments.CliArgumentsException { - CliArguments cliArguments = CliArguments.fromRawArgs(new String[0]); - CliArguments.CliArgumentsException exception = assertThrows(CliArguments.CliArgumentsException.class, cliArguments::endpoint); + void fails_on_missing_parameters() { + CliArguments.CliArgumentsException exception = assertThrows( + CliArguments.CliArgumentsException.class, + () -> CliArguments.fromRawArgs(new String[] {"--file=/path/to/file", "--stdin"})); assertEquals("Endpoint must be specified", exception.getMessage()); - exception = assertThrows(CliArguments.CliArgumentsException.class, cliArguments::inputFile); - assertEquals("Feed file must be specified", exception.getMessage()); + } + + @Test + void fails_on_conflicting_parameters() { + CliArguments.CliArgumentsException exception = assertThrows( + CliArguments.CliArgumentsException.class, + () -> CliArguments.fromRawArgs(new String[] {"--endpoint=https://endpoint", "--file=/path/to/file", "--stdin"})); + assertEquals("Either option 'file' or 'stdin' must be specified", exception.getMessage()); + + exception = assertThrows( + CliArguments.CliArgumentsException.class, + () -> CliArguments.fromRawArgs(new String[] {"--endpoint=https://endpoint"})); + assertEquals("Either option 'file' or 'stdin' must be specified", exception.getMessage()); } @Test diff --git a/vespa-feed-client-cli/src/test/resources/help.txt b/vespa-feed-client-cli/src/test/resources/help.txt index 9ad7642d4ec..29f24e8c672 100644 --- a/vespa-feed-client-cli/src/test/resources/help.txt +++ b/vespa-feed-client-cli/src/test/resources/help.txt @@ -11,4 +11,9 @@ Vespa feed client --help --max-streams-per-connection <arg> --private-key <arg> + --route <arg> + --stdin + --timeout <arg> + --trace <arg> + --verbose --version diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java index 74965270fb9..17162f19d3f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java @@ -6,12 +6,15 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static ai.vespa.feed.client.FeedClient.OperationType.put; @@ -28,7 +31,7 @@ import static java.util.Objects.requireNonNull; /** * @author jonmv */ -public class JsonStreamFeeder { +public class JsonStreamFeeder implements Closeable { private final FeedClient client; private final OperationParameters protoParameters; @@ -66,24 +69,36 @@ public class JsonStreamFeeder { * Note that {@code "id"} is an alias for the document put operation. */ public void feed(InputStream jsonStream) throws IOException { - feed(jsonStream, 1 << 26); + feed(jsonStream, 1 << 26, false); } - void feed(InputStream jsonStream, int size) throws IOException { + BenchmarkResult benchmark(InputStream jsonStream) throws IOException { + return feed(jsonStream, 1 << 26, true).get(); + } + + Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { RingBufferStream buffer = new RingBufferStream(jsonStream, size); buffer.expect(JsonToken.START_ARRAY); + AtomicInteger okCount = new AtomicInteger(); + AtomicInteger failedCount = new AtomicInteger(); + long startTime = System.nanoTime(); CompletableFuture<Result> result; AtomicReference<Throwable> thrown = new AtomicReference<>(); while ((result = buffer.next()) != null) { result.whenComplete((r, t) -> { - if (t != null) - thrown.set(t); - else - ; // Aggregate stats. + if (t != null) { + failedCount.incrementAndGet(); + if (!benchmark) thrown.set(t); + } else + okCount.incrementAndGet(); }); if (thrown.get() != null) sneakyThrow(thrown.get()); } + if (!benchmark) return Optional.empty(); + Duration duration = Duration.ofNanos(System.nanoTime() - startTime); + double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; + return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); } @SuppressWarnings("unchecked") @@ -91,6 +106,8 @@ public class JsonStreamFeeder { private static final JsonFactory factory = new JsonFactory(); + @Override public void close() throws IOException { client.close(); } + private class RingBufferStream extends InputStream { private final byte[] b = new byte[1]; @@ -330,4 +347,18 @@ public class JsonStreamFeeder { } + static class BenchmarkResult { + final int okCount; + final int errorCount; + final Duration duration; + final double throughput; + + BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { + this.okCount = okCount; + this.errorCount = errorCount; + this.duration = duration; + this.throughput = throughput; + } + } + } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java index 25f64e3c98a..8ef8ae57f5e 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java @@ -60,7 +60,7 @@ class JsonStreamFeederTest { public void close() throws IOException { } - }).build().feed(in, 1 << 7); // TODO: hangs on 1 << 6. + }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6. assertEquals(docs + 1, ids.size()); } |