aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--vespa-feed-client-cli/pom.xml5
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java98
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java71
-rw-r--r--vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java32
-rw-r--r--vespa-feed-client-cli/src/test/resources/help.txt5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java45
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java2
7 files changed, 206 insertions, 52 deletions
diff --git a/vespa-feed-client-cli/pom.xml b/vespa-feed-client-cli/pom.xml
index 62ff5c149ec..28e1ab01dae 100644
--- a/vespa-feed-client-cli/pom.xml
+++ b/vespa-feed-client-cli/pom.xml
@@ -31,6 +31,11 @@
<artifactId>commons-cli</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
<!-- test scope -->
<dependency>
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java
index 33c3d0e7894..8855b08b06b 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliArguments.java
@@ -16,10 +16,12 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalDouble;
import java.util.OptionalInt;
/**
@@ -42,11 +44,17 @@ class CliArguments {
private static final String HELP_OPTION = "help";
private static final String MAX_STREAMS_PER_CONNECTION = "max-streams-per-connection";
private static final String PRIVATE_KEY_OPTION = "private-key";
+ private static final String ROUTE_OPTION = "route";
+ private static final String TIMEOUT_OPTION = "timeout";
+ private static final String TRACE_OPTION = "trace";
+ private static final String VERBOSE_OPTION = "verbose";
private static final String VERSION_OPTION = "version";
+ private static final String STDIN_OPTION = "stdin";
private final CommandLine arguments;
- private CliArguments(CommandLine arguments) {
+ private CliArguments(CommandLine arguments) throws CliArgumentsException {
+ validateArgumentCombination(arguments);
this.arguments = arguments;
}
@@ -59,11 +67,26 @@ class CliArguments {
}
}
+ private static void validateArgumentCombination(CommandLine args) throws CliArgumentsException {
+ if (!args.hasOption(HELP_OPTION) && !args.hasOption(VERSION_OPTION)) {
+ if (!args.hasOption(ENDPOINT_OPTION)) {
+ throw new CliArgumentsException("Endpoint must be specified");
+ }
+ if (args.hasOption(FILE_OPTION) == args.hasOption(STDIN_OPTION)) {
+ throw new CliArgumentsException(String.format("Either option '%s' or '%s' must be specified", FILE_OPTION, STDIN_OPTION));
+ }
+ if (args.hasOption(CERTIFICATE_OPTION) != args.hasOption(PRIVATE_KEY_OPTION)) {
+ throw new CliArgumentsException(
+ String.format("Both '%s' and '%s' must be specified together", CERTIFICATE_OPTION, PRIVATE_KEY_OPTION));
+ }
+ } else if (args.hasOption(HELP_OPTION) && args.hasOption(VERSION_OPTION)) {
+ throw new CliArgumentsException(String.format("Cannot specify both '%s' and '%s'", HELP_OPTION, VERSION_OPTION));
+ }
+ }
+
URI endpoint() throws CliArgumentsException {
try {
- URL url = (URL) arguments.getParsedOptionValue(ENDPOINT_OPTION);
- if (url == null) throw new CliArgumentsException("Endpoint must be specified");
- return url.toURI();
+ return ((URL) arguments.getParsedOptionValue(ENDPOINT_OPTION)).toURI();
} catch (ParseException | URISyntaxException e) {
throw new CliArgumentsException("Invalid endpoint: " + e.getMessage(), e);
}
@@ -80,18 +103,14 @@ class CliArguments {
Optional<CertificateAndKey> certificateAndKey() throws CliArgumentsException {
Path certificateFile = fileValue(CERTIFICATE_OPTION).orElse(null);
Path privateKeyFile = fileValue(PRIVATE_KEY_OPTION).orElse(null);
- if ((certificateFile == null) != (privateKeyFile == null)) {
- throw new CliArgumentsException(String.format("Both '%s' and '%s' must be specified together", CERTIFICATE_OPTION, PRIVATE_KEY_OPTION));
- }
if (privateKeyFile == null && certificateFile == null) return Optional.empty();
return Optional.of(new CertificateAndKey(certificateFile, privateKeyFile));
}
Optional<Path> caCertificates() throws CliArgumentsException { return fileValue(CA_CERTIFICATES_OPTION); }
- Path inputFile() throws CliArgumentsException {
- return fileValue(FILE_OPTION)
- .orElseThrow(() -> new CliArgumentsException("Feed file must be specified"));
+ Optional<Path> inputFile() throws CliArgumentsException {
+ return fileValue(FILE_OPTION);
}
Map<String, String> headers() throws CliArgumentsException {
@@ -116,12 +135,27 @@ class CliArguments {
boolean benchmarkModeEnabled() { return has(BENCHMARK_OPTION); }
+ Optional<String> route() { return stringValue(ROUTE_OPTION); }
+
+ OptionalInt traceLevel() throws CliArgumentsException { return intValue(TRACE_OPTION); }
+
+ Optional<Duration> timeout() throws CliArgumentsException {
+ OptionalDouble timeout = doubleValue(TIMEOUT_OPTION);
+ return timeout.isPresent()
+ ? Optional.of(Duration.ofMillis((long)(timeout.getAsDouble()*1000)))
+ : Optional.empty();
+ }
+
+ boolean verboseSpecified() { return has(VERBOSE_OPTION); }
+
+ boolean readFeedFromStandardInput() { return has(STDIN_OPTION); }
+
private OptionalInt intValue(String option) throws CliArgumentsException {
try {
Number number = (Number) arguments.getParsedOptionValue(option);
return number != null ? OptionalInt.of(number.intValue()) : OptionalInt.empty();
} catch (ParseException e) {
- throw new CliArgumentsException(String.format("Invalid value for '%s': %s", option, e.getMessage()), e);
+ throw newInvalidValueException(option, e);
}
}
@@ -131,12 +165,27 @@ class CliArguments {
if (certificateFile == null) return Optional.empty();
return Optional.of(certificateFile.toPath());
} catch (ParseException e) {
- throw new CliArgumentsException(String.format("Invalid value for '%s': %s", option, e.getMessage()), e);
+ throw newInvalidValueException(option, e);
+ }
+ }
+
+ private Optional<String> stringValue(String option) { return Optional.ofNullable(arguments.getOptionValue(option)); }
+
+ private OptionalDouble doubleValue(String option) throws CliArgumentsException {
+ try {
+ Number number = (Number) arguments.getParsedOptionValue(option);
+ return number != null ? OptionalDouble.of(number.doubleValue()) : OptionalDouble.empty();
+ } catch (ParseException e) {
+ throw newInvalidValueException(option, e);
}
}
private boolean has(String option) { return arguments.hasOption(option); }
+ private static CliArgumentsException newInvalidValueException(String option, ParseException cause) {
+ return new CliArgumentsException(String.format("Invalid value for '%s': %s", option, cause.getMessage()), cause);
+ }
+
private static Options createOptions() {
// TODO Add description to each option
return new Options()
@@ -171,11 +220,6 @@ class CliArguments {
.type(Number.class)
.build())
.addOption(Option.builder()
- .longOpt(CONNECTIONS_OPTION)
- .hasArg()
- .type(Number.class)
- .build())
- .addOption(Option.builder()
.longOpt(CERTIFICATE_OPTION)
.type(File.class)
.hasArg()
@@ -195,6 +239,26 @@ class CliArguments {
.build())
.addOption(Option.builder()
.longOpt(BENCHMARK_OPTION)
+ .build())
+ .addOption(Option.builder()
+ .longOpt(ROUTE_OPTION)
+ .hasArg()
+ .build())
+ .addOption(Option.builder()
+ .longOpt(TIMEOUT_OPTION)
+ .hasArg()
+ .type(Number.class)
+ .build())
+ .addOption(Option.builder()
+ .longOpt(TRACE_OPTION)
+ .hasArg()
+ .type(Number.class)
+ .build())
+ .addOption(Option.builder()
+ .longOpt(STDIN_OPTION)
+ .build())
+ .addOption(Option.builder()
+ .longOpt(VERBOSE_OPTION)
.build());
}
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index 27ecada98c3..f30d44ba4f3 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -1,12 +1,16 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
-import java.nio.file.Path;
+import java.nio.file.Files;
import java.util.Properties;
/**
@@ -35,8 +39,9 @@ public class CliClient {
}
private int run(String[] rawArgs) {
+ CliArguments cliArgs = null;
try {
- CliArguments cliArgs = CliArguments.fromRawArgs(rawArgs);
+ cliArgs = CliArguments.fromRawArgs(rawArgs);
if (cliArgs.helpSpecified()) {
cliArgs.printHelp(systemOut);
return 0;
@@ -45,41 +50,67 @@ public class CliClient {
systemOut.println(Vespa.VERSION);
return 0;
}
- FeedClient feedClient = createFeedClient(cliArgs);
+ try (InputStream in = createFeedInputStream(cliArgs);
+ JsonStreamFeeder feeder = createJsonFeeder(cliArgs)) {
+ if (cliArgs.benchmarkModeEnabled()) {
+ printBenchmarkResult(feeder.benchmark(in));
+ } else {
+ feeder.feed(in);
+ }
+ }
return 0;
} catch (CliArguments.CliArgumentsException | IOException e) {
- return handleException(e);
+ boolean verbose = cliArgs != null && cliArgs.verboseSpecified();
+ return handleException(verbose, e);
+ } catch (Exception e) {
+ boolean verbose = cliArgs != null && cliArgs.verboseSpecified();
+ return handleException(verbose, "Unknown failure: " + e.getMessage(), e);
}
}
- private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
+ private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException {
FeedClientBuilder builder = FeedClientBuilder.create(cliArgs.endpoint());
cliArgs.connections().ifPresent(builder::setMaxConnections);
cliArgs.maxStreamsPerConnection().ifPresent(builder::setMaxConnections);
if (cliArgs.sslHostnameVerificationDisabled()) {
builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE);
}
- CliArguments.CertificateAndKey certificateAndKey = cliArgs.certificateAndKey().orElse(null);
- Path caCertificates = cliArgs.caCertificates().orElse(null);
- if (certificateAndKey != null || caCertificates != null) {
- SslContextBuilder sslContextBuilder = new SslContextBuilder();
- if (certificateAndKey != null) {
- sslContextBuilder.withCertificateAndKey(certificateAndKey.certificateFile, certificateAndKey.privateKeyFile);
- }
- if (caCertificates != null) {
- sslContextBuilder.withCaCertificates(caCertificates);
- }
- builder.setSslContext(sslContextBuilder.build());
- }
+ cliArgs.certificateAndKey().ifPresent(c -> builder.setCertificate(c.certificateFile, c.privateKeyFile));
+ cliArgs.caCertificates().ifPresent(builder::setCaCertificates);
cliArgs.headers().forEach(builder::addRequestHeader);
return builder.build();
}
- private int handleException(Exception e) { return handleException(e.getMessage(), e); }
+ private static JsonStreamFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
+ FeedClient feedClient = createFeedClient(cliArgs);
+ JsonStreamFeeder.Builder builder = JsonStreamFeeder.builder(feedClient);
+ cliArgs.timeout().ifPresent(builder::withTimeout);
+ cliArgs.route().ifPresent(builder::withRoute);
+ cliArgs.traceLevel().ifPresent(builder::withTracelevel);
+ return builder.build();
+ }
+
+ private InputStream createFeedInputStream(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
+ return cliArgs.readFeedFromStandardInput() ? systemIn : Files.newInputStream(cliArgs.inputFile().get());
+ }
+
+ private void printBenchmarkResult(JsonStreamFeeder.BenchmarkResult result) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
+ generator.writeStartObject();
+ generator.writeNumberField("feeder.runtime", result.duration.toMillis());
+ generator.writeNumberField("feeder.okcount", result.okCount);
+ generator.writeNumberField("feeder.errorcount", result.errorCount);
+ generator.writeNumberField("feeder.throughput", result.throughput);
+ generator.writeEndObject();
+ }
+ }
+
+ private int handleException(boolean verbose, Exception e) { return handleException(verbose, e.getMessage(), e); }
- private int handleException(String message, Exception exception) {
+ private int handleException(boolean verbose, String message, Exception exception) {
systemError.println(message);
- if (debugMode()) {
+ if (debugMode() || verbose) {
exception.printStackTrace(systemError);
}
return 1;
diff --git a/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java b/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java
index 33ee31ff0dc..a99399e638d 100644
--- a/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java
+++ b/vespa-feed-client-cli/src/test/java/ai/vespa/feed/client/CliArgumentsTest.java
@@ -7,6 +7,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.time.Duration;
import static org.junit.jupiter.api.Assertions.*;
@@ -21,9 +22,10 @@ class CliArgumentsTest {
"--endpoint=https://vespa.ai:4443/", "--file=feed.json", "--connections=10",
"--max-streams-per-connection=128", "--certificate=cert.pem", "--private-key=key.pem",
"--ca-certificates=ca-certs.pem", "--disable-ssl-hostname-verification",
- "--header=\"My-Header: my-value\"", "--header", "Another-Header: another-value", "--benchmark"});
+ "--header=\"My-Header: my-value\"", "--header", "Another-Header: another-value", "--benchmark",
+ "--route=myroute", "--timeout=0.125", "--trace=9", "--verbose"});
assertEquals(URI.create("https://vespa.ai:4443/"), args.endpoint());
- assertEquals(Paths.get("feed.json"), args.inputFile());
+ assertEquals(Paths.get("feed.json"), args.inputFile().get());
assertEquals(10, args.connections().getAsInt());
assertEquals(128, args.maxStreamsPerConnection().getAsInt());
assertEquals(Paths.get("cert.pem"), args.certificateAndKey().get().certificateFile);
@@ -36,15 +38,31 @@ class CliArgumentsTest {
assertEquals("my-value", args.headers().get("My-Header"));
assertEquals("another-value", args.headers().get("Another-Header"));
assertTrue(args.benchmarkModeEnabled());
+ assertEquals("myroute", args.route().get());
+ assertEquals(Duration.ofMillis(125), args.timeout().get());
+ assertEquals(9, args.traceLevel().getAsInt());
+ assertTrue(args.verboseSpecified());
}
@Test
- void fails_on_missing_parameters() throws CliArguments.CliArgumentsException {
- CliArguments cliArguments = CliArguments.fromRawArgs(new String[0]);
- CliArguments.CliArgumentsException exception = assertThrows(CliArguments.CliArgumentsException.class, cliArguments::endpoint);
+ void fails_on_missing_parameters() {
+ CliArguments.CliArgumentsException exception = assertThrows(
+ CliArguments.CliArgumentsException.class,
+ () -> CliArguments.fromRawArgs(new String[] {"--file=/path/to/file", "--stdin"}));
assertEquals("Endpoint must be specified", exception.getMessage());
- exception = assertThrows(CliArguments.CliArgumentsException.class, cliArguments::inputFile);
- assertEquals("Feed file must be specified", exception.getMessage());
+ }
+
+ @Test
+ void fails_on_conflicting_parameters() {
+ CliArguments.CliArgumentsException exception = assertThrows(
+ CliArguments.CliArgumentsException.class,
+ () -> CliArguments.fromRawArgs(new String[] {"--endpoint=https://endpoint", "--file=/path/to/file", "--stdin"}));
+ assertEquals("Either option 'file' or 'stdin' must be specified", exception.getMessage());
+
+ exception = assertThrows(
+ CliArguments.CliArgumentsException.class,
+ () -> CliArguments.fromRawArgs(new String[] {"--endpoint=https://endpoint"}));
+ assertEquals("Either option 'file' or 'stdin' must be specified", exception.getMessage());
}
@Test
diff --git a/vespa-feed-client-cli/src/test/resources/help.txt b/vespa-feed-client-cli/src/test/resources/help.txt
index 9ad7642d4ec..29f24e8c672 100644
--- a/vespa-feed-client-cli/src/test/resources/help.txt
+++ b/vespa-feed-client-cli/src/test/resources/help.txt
@@ -11,4 +11,9 @@ Vespa feed client
--help
--max-streams-per-connection <arg>
--private-key <arg>
+ --route <arg>
+ --stdin
+ --timeout <arg>
+ --trace <arg>
+ --verbose
--version
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
index 74965270fb9..17162f19d3f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
@@ -6,12 +6,15 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static ai.vespa.feed.client.FeedClient.OperationType.put;
@@ -28,7 +31,7 @@ import static java.util.Objects.requireNonNull;
/**
* @author jonmv
*/
-public class JsonStreamFeeder {
+public class JsonStreamFeeder implements Closeable {
private final FeedClient client;
private final OperationParameters protoParameters;
@@ -66,24 +69,36 @@ public class JsonStreamFeeder {
* Note that {@code "id"} is an alias for the document put operation.
*/
public void feed(InputStream jsonStream) throws IOException {
- feed(jsonStream, 1 << 26);
+ feed(jsonStream, 1 << 26, false);
}
- void feed(InputStream jsonStream, int size) throws IOException {
+ BenchmarkResult benchmark(InputStream jsonStream) throws IOException {
+ return feed(jsonStream, 1 << 26, true).get();
+ }
+
+ Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException {
RingBufferStream buffer = new RingBufferStream(jsonStream, size);
buffer.expect(JsonToken.START_ARRAY);
+ AtomicInteger okCount = new AtomicInteger();
+ AtomicInteger failedCount = new AtomicInteger();
+ long startTime = System.nanoTime();
CompletableFuture<Result> result;
AtomicReference<Throwable> thrown = new AtomicReference<>();
while ((result = buffer.next()) != null) {
result.whenComplete((r, t) -> {
- if (t != null)
- thrown.set(t);
- else
- ; // Aggregate stats.
+ if (t != null) {
+ failedCount.incrementAndGet();
+ if (!benchmark) thrown.set(t);
+ } else
+ okCount.incrementAndGet();
});
if (thrown.get() != null)
sneakyThrow(thrown.get());
}
+ if (!benchmark) return Optional.empty();
+ Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
+ double throughPut = (double)okCount.get() / duration.toMillis() * 1000D;
+ return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut));
}
@SuppressWarnings("unchecked")
@@ -91,6 +106,8 @@ public class JsonStreamFeeder {
private static final JsonFactory factory = new JsonFactory();
+ @Override public void close() throws IOException { client.close(); }
+
private class RingBufferStream extends InputStream {
private final byte[] b = new byte[1];
@@ -330,4 +347,18 @@ public class JsonStreamFeeder {
}
+ static class BenchmarkResult {
+ final int okCount;
+ final int errorCount;
+ final Duration duration;
+ final double throughput;
+
+ BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) {
+ this.okCount = okCount;
+ this.errorCount = errorCount;
+ this.duration = duration;
+ this.throughput = throughput;
+ }
+ }
+
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
index 25f64e3c98a..8ef8ae57f5e 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
@@ -60,7 +60,7 @@ class JsonStreamFeederTest {
public void close() throws IOException {
}
- }).build().feed(in, 1 << 7); // TODO: hangs on 1 << 6.
+ }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6.
assertEquals(docs + 1, ids.size());
}