diff options
16 files changed, 293 insertions, 193 deletions
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index a6119c5de4f..9d4f3525c32 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -8,6 +8,7 @@ import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLSession; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintStream; import java.nio.file.Files; import java.time.Duration; @@ -57,18 +58,12 @@ public class CliClient { return 0; } try (InputStream in = createFeedInputStream(cliArgs); - JsonFeeder feeder = createJsonFeeder(cliArgs)) { + FeedClient feedClient = createFeedClient(cliArgs); + JsonFeeder feeder = createJsonFeeder(feedClient, cliArgs)) { + long startNanos = System.nanoTime(); + feeder.feedMany(in).join(); if (cliArgs.benchmarkModeEnabled()) { - BenchmarkResultAggregator aggregator = new BenchmarkResultAggregator(); - feeder.feedMany(in, aggregator).join(); - aggregator.printBenchmarkResult(); - } else { - JsonFeeder.ResultCallback emptyCallback = new JsonFeeder.ResultCallback() { - @Override public void onNextResult(Result result, Throwable error) {} - @Override public void onError(Throwable error) {} - @Override public void onComplete() {} - }; - feeder.feedMany(in, emptyCallback).join(); + printBenchmarkResult(System.nanoTime() - startNanos, feedClient.stats(), systemOut); } } return 0; @@ -94,8 +89,7 @@ public class CliClient { return builder.build(); } - private static JsonFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { - FeedClient feedClient = createFeedClient(cliArgs); + private static JsonFeeder createJsonFeeder(FeedClient feedClient, CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { JsonFeeder.Builder builder = JsonFeeder.builder(feedClient); cliArgs.timeout().ifPresent(builder::withTimeout); cliArgs.route().ifPresent(builder::withRoute); @@ -129,42 +123,30 @@ public class CliClient { @Override public boolean verify(String hostname, SSLSession session) { return true; } } - private class BenchmarkResultAggregator implements JsonFeeder.ResultCallback { - - private final AtomicInteger okCount = new AtomicInteger(); - private final AtomicInteger errorCount = new AtomicInteger(); - private volatile long endNanoTime; - private volatile long startNanoTime; - - void start() { this.startNanoTime = System.nanoTime(); } - - void printBenchmarkResult() throws IOException { - JsonFactory factory = new JsonFactory(); - Duration duration = Duration.ofNanos(endNanoTime - startNanoTime); - int okCount = this.okCount.get(); - int errorCount = this.errorCount.get(); - double throughput = (double) okCount / duration.toMillis() * 1000D; - try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) { - generator.writeStartObject(); - generator.writeNumberField("feeder.runtime", duration.toMillis()); - generator.writeNumberField("feeder.okcount", okCount); - generator.writeNumberField("feeder.errorcount", errorCount); - generator.writeNumberField("feeder.throughput", throughput); - generator.writeEndObject(); - } - } - - @Override - public void onNextResult(Result result, Throwable error) { - if (error != null) { - errorCount.incrementAndGet(); - } else { - okCount.incrementAndGet(); - } + static void printBenchmarkResult(long durationNanos, OperationStats stats, OutputStream systemOut) throws IOException { + JsonFactory factory = new JsonFactory(); + long okCount = stats.successes(); + long errorCount = stats.requests() - okCount; + double throughput = okCount * 1e6D / Math.max(1, durationNanos); + try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) { + generator.writeStartObject(); + generator.writeNumberField("feeder.runtime", durationNanos / 1_000_000); + generator.writeNumberField("feeder.okcount", okCount); + generator.writeNumberField("feeder.errorcount", errorCount); + generator.writeNumberField("feeder.throughput", throughput); + generator.writeNumberField("feeder.minlatency", stats.minLatencyMillis()); + generator.writeNumberField("feeder.avglatency", stats.averageLatencyMillis()); + generator.writeNumberField("feeder.maxlatency", stats.maxLatencyMillis()); + generator.writeNumberField("feeder.bytessent", stats.bytesSent()); + generator.writeNumberField("feeder.bytesreceived", stats.bytesReceived()); + + generator.writeObjectFieldStart("feeder.responsecodes"); + for (Map.Entry<Integer, Long> entry : stats.responsesByCode().entrySet()) + generator.writeNumberField(Integer.toString(entry.getKey()), entry.getValue()); + generator.writeEndObject(); + + generator.writeEndObject(); } - - @Override public void onError(Throwable error) {} - - @Override public void onComplete() { this.endNanoTime = System.nanoTime(); } } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java index 5099458f3fe..672f5f080b5 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -8,6 +8,7 @@ import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.net.URIAuthority; @@ -29,18 +30,23 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; /** * @author jonmv */ -class HttpCluster implements Cluster { +class ApacheCluster implements Cluster { private final List<Endpoint> endpoints = new ArrayList<>(); - public HttpCluster(FeedClientBuilder builder) throws IOException { + public ApacheCluster(FeedClientBuilder builder) throws IOException { for (URI endpoint : builder.endpoints) for (int i = 0; i < builder.connectionsPerEndpoint; i++) endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) { + SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); + wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); + if (wrapped.body() != null) + request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); + int index = 0; int min = Integer.MAX_VALUE; for (int i = 0; i < endpoints.size(); i++) @@ -56,7 +62,7 @@ class HttpCluster implements Cluster { request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); endpoint.client.execute(request, new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } + @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } @Override public void cancelled() { vessel.cancel(false); } }); @@ -148,4 +154,26 @@ class HttpCluster implements Cluster { return sslContextBuilder.build(); } + + private static class ApacheHttpResponse implements HttpResponse { + + private final SimpleHttpResponse wrapped; + + private ApacheHttpResponse(SimpleHttpResponse wrapped) { + this.wrapped = wrapped; + } + + + @Override + public int code() { + return wrapped.getCode(); + } + + @Override + public byte[] body() { + return wrapped.getBodyBytes(); + } + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java index 1ae8ae1d490..0e9bfe0ef46 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java @@ -1,9 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -41,7 +38,7 @@ public class BenchmarkingCluster implements Cluster { } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { requests.incrementAndGet(); long startMillis = System.currentTimeMillis(); delegate.dispatch(request, vessel); @@ -49,13 +46,13 @@ public class BenchmarkingCluster implements Cluster { results++; if (thrown == null) { responses++; - responsesByCode[response.getCode()]++; + responsesByCode[response.code()]++; long latency = System.currentTimeMillis() - startMillis; totalLatencyMillis += latency; minLatencyMillis = Math.min(minLatencyMillis, latency); maxLatencyMillis = Math.max(maxLatencyMillis, latency); - bytesSent += request.getBodyBytes() == null ? 0 : request.getBodyBytes().length; - bytesReceived += response.getBodyBytes() == null ? 0 : response.getBodyBytes().length; + bytesSent += request.body() == null ? 0 : request.body().length; + bytesReceived += response.body() == null ? 0 : response.body().length; } else exceptions++; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java index bcf1c4ae107..f428fb567e6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -1,9 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - import java.io.Closeable; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -14,7 +11,7 @@ import java.util.concurrent.CompletableFuture; interface Cluster extends Closeable { /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */ - void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel); + void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel); @Override default void close() { } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java index eb31d1aa808..25068818396 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java @@ -5,4 +5,11 @@ package ai.vespa.feed.client; * @author bjorncs */ public class FeedException extends RuntimeException { + + public FeedException(String message) { super(message); } + + public FeedException(String message, Throwable cause) { super(message, cause); } + + public FeedException(Throwable cause) { super(cause); } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 6e7e20ae121..c572f84db54 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -4,17 +4,14 @@ package ai.vespa.feed.client; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.net.URIBuilder; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.PrintStream; import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -23,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; /** @@ -89,37 +87,29 @@ class HttpFeedClient implements FeedClient { ensureOpen(); String path = operationPath(documentId, params).toString(); - SimpleHttpRequest request = new SimpleHttpRequest(method, path); - requestHeaders.forEach((name, value) -> request.setHeader(name, value.get())); - if (operationJson != null) - request.setBody(operationJson, ContentType.APPLICATION_JSON); + HttpRequest request = new HttpRequest(method, path, requestHeaders, operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way? return requestStrategy.enqueue(documentId, request) - .handle((response, thrown) -> { - if (thrown != null) { - // TODO: What to do with exceptions here? Ex on 400, 401, 403, etc, and wrap and throw? - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - thrown.printStackTrace(new PrintStream(buffer)); - return new Result(Result.Type.failure, documentId, buffer.toString(), null); - } - return toResult(response, documentId); - }); + .thenApply(response -> toResult(request, response, documentId)); } - static Result toResult(SimpleHttpResponse response, DocumentId documentId) { + static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) { Result.Type type; - switch (response.getCode()) { + switch (response.code()) { case 200: type = Result.Type.success; break; case 412: type = Result.Type.conditionNotMet; break; - default: type = Result.Type.failure; + case 502: + case 504: + case 507: type = Result.Type.failure; break; + default: type = null; } String message = null; String trace = null; try { - JsonParser parser = factory.createParser(response.getBodyText()); + JsonParser parser = factory.createParser(response.body()); if (parser.nextToken() != JsonToken.START_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText()); + throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); String name; while ((name = parser.nextFieldName()) != null) { @@ -131,12 +121,16 @@ class HttpFeedClient implements FeedClient { } if (parser.currentToken() != JsonToken.END_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText()); + throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } + if (type == null) // Not a Vespa response, but a failure in the HTTP layer. + throw new FeedException("Status " + response.code() + " executing '" + request + + "': " + (message == null ? new String(response.body(), UTF_8) : message)); + return new Result(type, documentId, message, trace); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java new file mode 100644 index 00000000000..8da2f46def2 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java @@ -0,0 +1,42 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.Map; +import java.util.function.Supplier; + +class HttpRequest { + + private final String method; + private final String path; + private final Map<String, Supplier<String>> headers; + private final byte[] body; + + public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) { + this.method = method; + this.path = path; + this.headers = headers; + this.body = body; + } + + public String method() { + return method; + } + + public String path() { + return path; + } + + public Map<String, Supplier<String>> headers() { + return headers; + } + + public byte[] body() { + return body; + } + + @Override + public String toString() { + return method + " " + path; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 408488cbaec..5646d37cde3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -3,8 +3,6 @@ package ai.vespa.feed.client; import ai.vespa.feed.client.FeedClient.CircuitBreaker; import ai.vespa.feed.client.FeedClient.RetryStrategy; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import java.io.IOException; import java.util.Map; @@ -23,6 +21,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.lang.Math.max; import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; @@ -60,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new HttpCluster(builder))); + this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { @@ -112,15 +111,15 @@ class HttpRequestStrategy implements RequestStrategy { return inflight.get() - delayedCount.get() > targetInflight(); } - private boolean retry(SimpleHttpRequest request, int attempt) { + private boolean retry(HttpRequest request, int attempt) { if (attempt > strategy.retries()) return false; - switch (request.getMethod().toUpperCase()) { + switch (request.method().toUpperCase()) { case "POST": return strategy.retry(FeedClient.OperationType.PUT); case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE); case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE); - default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod()); + default: throw new IllegalStateException("Unexpected HTTP method: " + request.method()); } } @@ -128,7 +127,7 @@ class HttpRequestStrategy implements RequestStrategy { * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, * or the user has turned off retries for this type of operation. */ - private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) { + private boolean retry(HttpRequest request, Throwable thrown, int attempt) { breaker.failure(); log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); @@ -151,23 +150,23 @@ class HttpRequestStrategy implements RequestStrategy { } /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */ - private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) { - if (response.getCode() / 100 == 2) { + private boolean retry(HttpRequest request, HttpResponse response, int attempt) { + if (response.code() / 100 == 2) { breaker.success(); incrementTargetInflight(); return false; } - log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + + log.log(FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), UTF_8) + ") on attempt " + attempt + " at " + request); - if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight. + if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight. decreaseTargetInflight(); return true; } breaker.failure(); - if (response.getCode() == 500 || response.getCode() == 502 || response.getCode() == 504) // Hopefully temporary errors. + if (response.code() == 500 || response.code() == 502 || response.code() == 504) // Hopefully temporary errors. return retry(request, attempt); return false; @@ -205,9 +204,9 @@ class HttpRequestStrategy implements RequestStrategy { } @Override - public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) { - CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. - CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. + public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { + CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. CompletableFuture<?> previous = inflightById.put(documentId, result); if (destroyed.get()) { result.cancel(true); @@ -232,14 +231,14 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) { + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { retries.incrementAndGet(); CircuitBreaker.State state = breaker.state(); - CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); + CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); offer(() -> cluster.dispatch(request, retry)); handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java new file mode 100644 index 00000000000..b1dd54240eb --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java @@ -0,0 +1,16 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +interface HttpResponse { + + int code(); + byte[] body(); + + static HttpResponse of(int code, byte[] body) { + return new HttpResponse() { + @Override public int code() { return code; } + @Override public byte[] body() { return body; } + }; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 2a6d2e15747..de32e7abdf5 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -58,18 +58,18 @@ public class JsonFeeder implements Closeable { * @param result Non-null if operation completed successfully * @param error Non-null if operation failed */ - void onNextResult(Result result, Throwable error); + default void onNextResult(Result result, Throwable error) { } /** * Invoked if an unrecoverable error occurred during feed processing, * after which no other {@link ResultCallback} methods are invoked. */ - void onError(Throwable error); + default void onError(Throwable error) { } /** * Invoked when all feed operations are either completed successfully or failed. */ - void onComplete(); + default void onComplete() { } } public static Builder builder(FeedClient client) { return new Builder(client); } @@ -103,6 +103,10 @@ public class JsonFeeder implements Closeable { return feedMany(jsonStream, 1 << 26, resultCallback); } + public CompletableFuture<Void> feedMany(InputStream jsonStream) { + return feedMany(jsonStream, new ResultCallback() { }); + } + CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { RingBufferStream buffer = new RingBufferStream(jsonStream, size); CompletableFuture<Void> overallResult = new CompletableFuture<>(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java index 9305709d873..d36475a51fb 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java @@ -42,6 +42,10 @@ public class OperationStats { return requests - inflight; } + public long successes() { + return responsesByCode.getOrDefault(200, 0L); + } + public Map<Integer, Long> responsesByCode() { return responsesByCode; } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index bc2707bc490..7764dce712b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,12 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - -import java.io.Closeable; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; /** * Controls execution of feed operations. @@ -28,6 +23,6 @@ interface RequestStrategy { void await(); /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */ - CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request); + CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index 65fbcb12204..0605ed5bf65 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -1,20 +1,18 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.core5.http.ContentType; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.function.BiConsumer; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * @author jonmv @@ -22,50 +20,82 @@ import static org.junit.jupiter.api.Assertions.assertEquals; class HttpFeedClientTest { @Test - void testRequestGeneration() throws IOException, ExecutionException, InterruptedException { + void testFeeding() throws ExecutionException, InterruptedException { DocumentId id = DocumentId.of("ns", "type", "0"); + AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @Override public OperationStats stats() { throw new UnsupportedOperationException(); } @Override public boolean hasFailed() { return false; } @Override public void destroy() { throw new UnsupportedOperationException(); } @Override public void await() { throw new UnsupportedOperationException(); } - @Override public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) { - try { - assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", - request.getUri().toString()); - assertEquals("json", request.getBodyText()); + @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } + } + FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy()); - SimpleHttpResponse response = new SimpleHttpResponse(502); - response.setBody("{\n" + - " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + - " \"id\": \"id:ns:type::0\",\n" + - " \"message\": \"Ooops! ... I did it again.\",\n" + - " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + - "}", - ContentType.APPLICATION_JSON); - return CompletableFuture.completedFuture(response); - } - catch (Throwable thrown) { - CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>(); - failed.completeExceptionally(thrown); - return failed; - } - } + // Vespa error is an error result. + dispatch.set((documentId, request) -> { + try { + assertEquals(id, documentId); + assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); - } - Result result = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), - new MockRequestStrategy()) - .put(id, - "json", - OperationParameters.empty() - .createIfNonExistent(true) - .testAndSetCondition("false") - .route("route") - .timeout(Duration.ofSeconds(5))) - .get(); + HttpResponse response = HttpResponse.of(502, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Ooops! ... I did it again.\",\n" + + " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + + "}").getBytes(UTF_8)); + return CompletableFuture.completedFuture(response); + } + catch (Throwable thrown) { + CompletableFuture<HttpResponse> failed = new CompletableFuture<>(); + failed.completeExceptionally(thrown); + return failed; + } + }); + Result result = client.put(id, + "json", + OperationParameters.empty() + .createIfNonExistent(true) + .testAndSetCondition("false") + .route("route") + .timeout(Duration.ofSeconds(5))) + .get(); assertEquals("Ooops! ... I did it again.", result.resultMessage().get()); assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get()); + + + // Handler error is a FeedException. + dispatch.set((documentId, request) -> { + try { + assertEquals(id, documentId); + assertEquals("/document/v1/ns/type/docid/0", + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); + + HttpResponse response = HttpResponse.of(500, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Alla ska i jorden.\",\n" + + " \"trace\": \"Din tid den kom, och senn så for den. \"\n" + + "}").getBytes(UTF_8)); + return CompletableFuture.completedFuture(response); + } + catch (Throwable thrown) { + CompletableFuture<HttpResponse> failed = new CompletableFuture<>(); + failed.completeExceptionally(thrown); + return failed; + } + }); + ExecutionException expected = assertThrows(ExecutionException.class, + () -> client.put(id, + "json", + OperationParameters.empty()) + .get()); + assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); } } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 7411f4124e5..d3005227184 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -2,8 +2,6 @@ package ai.vespa.feed.client; import ai.vespa.feed.client.FeedClient.CircuitBreaker; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ContentType; import org.junit.jupiter.api.Test; @@ -35,9 +33,8 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { int documents = 1 << 16; - SimpleHttpRequest request = new SimpleHttpRequest("PUT", "/"); - SimpleHttpResponse response = new SimpleHttpResponse(200); - response.setBody("{}".getBytes(UTF_8), ContentType.APPLICATION_JSON); + HttpRequest request = new HttpRequest("PUT", "/", null, null); + HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), 100, TimeUnit.MILLISECONDS)); @@ -84,7 +81,7 @@ class HttpRequestStrategyTest { DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - SimpleHttpRequest request = new SimpleHttpRequest("POST", "/"); + HttpRequest request = new HttpRequest("POST", "/", null, null); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -101,17 +98,17 @@ class HttpRequestStrategyTest { assertEquals(3, strategy.stats().requests()); // Successful response is returned - SimpleHttpResponse success = new SimpleHttpResponse(200); + HttpResponse success = HttpResponse.of(200, null); cluster.expect((__, vessel) -> vessel.complete(success)); assertEquals(success, strategy.enqueue(id1, request).get()); assertEquals(4, strategy.stats().requests()); // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. now.set(2000); - SimpleHttpResponse throttled = new SimpleHttpResponse(429); + HttpResponse throttled = HttpResponse.of(429, null); AtomicInteger count = new AtomicInteger(3); CountDownLatch latch = new CountDownLatch(1); - AtomicReference<CompletableFuture<SimpleHttpResponse>> completion = new AtomicReference<>(); + AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>(); cluster.expect((req, vessel) -> { if (req == request) { if (count.decrementAndGet() > 0) @@ -123,9 +120,9 @@ class HttpRequestStrategyTest { } else vessel.complete(success); }); - CompletableFuture<SimpleHttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<SimpleHttpResponse> serialised = strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")); - assertEquals(success, strategy.enqueue(id2, new SimpleHttpRequest("DELETE", "/")).get()); + CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. now.set(4000); @@ -135,7 +132,7 @@ class HttpRequestStrategyTest { assertEquals(success, serialised.get()); // Some error responses are retried. - SimpleHttpResponse serverError = new SimpleHttpResponse(500); + HttpResponse serverError = HttpResponse.of(500, null); cluster.expect((__, vessel) -> vessel.complete(serverError)); assertEquals(serverError, strategy.enqueue(id1, request).get()); assertEquals(11, strategy.stats().requests()); @@ -143,11 +140,11 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. - SimpleHttpResponse badRequest = new SimpleHttpResponse(400); + HttpResponse badRequest = HttpResponse.of(400, null); cluster.expect((__, vessel) -> vessel.complete(badRequest)); assertEquals(badRequest, strategy.enqueue(id1, request).get()); assertEquals(13, strategy.stats().requests()); @@ -169,14 +166,14 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { - final AtomicReference<BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>(); + final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); - void expect(BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> expected) { + void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { dispatch.set(expected); } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { dispatch.get().accept(request, vessel); } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 789a2ab537f..9db296e33cd 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -122,7 +122,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap; */ public class DocumentV1ApiHandler extends AbstractRequestHandler { - private static final Duration defaultTimeout = Duration.ofSeconds(175); + private static final Duration defaultTimeout = Duration.ofSeconds(180); // Match document API default timeout. private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName()); private static final Parser<Integer> integerParser = Integer::parseInt; @@ -160,6 +160,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String TRACELEVEL = "tracelevel"; private final Clock clock; + private final Duration handlerTimeout; private final Metric metric; private final DocumentApiMetrics metrics; private final DocumentOperationParser parser; @@ -184,14 +185,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { - this(Clock.systemUTC(), metric, metricReceiver, documentAccess, + this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess, documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig); } - DocumentV1ApiHandler(Clock clock, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, + DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) { this.clock = clock; + this.handlerTimeout = handlerTimeout; this.parser = new DocumentOperationParser(documentmanagerConfig); this.metric = metric; this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); @@ -222,8 +224,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { HttpRequest request = (HttpRequest) rawRequest; try { - request.setTimeout(getProperty(request, TIMEOUT, timeoutMillisParser) - .orElse(defaultTimeout.toMillis()), + // Set a higher HTTP layer timeout than the document API timeout, to prefer triggering the latter. + request.setTimeout( getProperty(request, TIMEOUT, timeoutMillisParser).orElse(defaultTimeout.toMillis()) + + handlerTimeout.toMillis(), TimeUnit.MILLISECONDS); Path requestPath = new Path(request.getUri()); @@ -251,7 +254,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void handleTimeout(Request request, ResponseHandler responseHandler) { - timeout((HttpRequest) request, "Request timeout after " + request.getTimeout(TimeUnit.MILLISECONDS) + "ms", responseHandler); + timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler); } @Override @@ -970,7 +973,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { parameters.setMaxTotalHits(wantedDocumentCount); parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); parameters.visitInconsistentBuckets(true); - parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000)); + parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); return parameters; } @@ -980,7 +983,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER))); parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1)); long timeChunk = getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L); - parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - 5000L))); + parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()))); return parameters; } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 49fa15849ce..29ae7f52265 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -60,6 +60,7 @@ import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -133,7 +134,7 @@ public class DocumentV1ApiTest { access = new MockDocumentAccess(docConfig); metric = new NullMetric(); metrics = new MetricReceiver.MockReceiver(); - handler = new DocumentV1ApiHandler(clock, metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); } @After @@ -179,7 +180,7 @@ public class DocumentV1ApiTest { } @Test - public void testResponses() { + public void testResponses() throws InterruptedException { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null)); // GET at non-existent path returns 404 with available paths @@ -207,7 +208,7 @@ public class DocumentV1ApiTest { assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); assertEquals("[id]", parameters.getFieldSet()); assertEquals("(all the things)", parameters.getDocumentSelection()); - assertEquals(1000, parameters.getSessionTimeoutMs()); + assertEquals(6000, parameters.getSessionTimeoutMs()); // Put some documents in the response parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); @@ -272,7 +273,7 @@ public class DocumentV1ApiTest { access.expect(parameters -> { assertEquals("[Content:cluster=content]", parameters.getRemoteDataHandler()); assertEquals("[all]", parameters.fieldSet()); - assertEquals(55_000L, parameters.getSessionTimeoutMs()); + assertEquals(60_000L, parameters.getSessionTimeoutMs()); parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "We made it!"); }); response = driver.sendRequest("http://localhost/document/v1/space/music/docid?destinationCluster=content&selection=true&cluster=content&timeout=60", POST); @@ -666,14 +667,18 @@ public class DocumentV1ApiTest { handler.set(parameters.responseHandler().get()); return new Result(Result.ResultType.SUCCESS, null); }); - var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?timeout=1ms"); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/docid/one\"," + - " \"message\": \"Request timeout after 1ms\"" + - "}", response4.readAll()); - assertEquals(504, response4.getStatus()); - if (handler.get() != null) // Timeout may have occurred before dispatch, or ... - handler.get().handleResponse(new Response(0)); // response may eventually arrive, but too late. + try { + var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?timeout=1ms"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid/one\"," + + " \"message\": \"Timeout after 1ms\"" + + "}", response4.readAll()); + assertEquals(504, response4.getStatus()); + } + finally { + if (handler.get() != null) // Timeout may have occurred before dispatch, or ... + handler.get().handleResponse(new Response(0)); // response may eventually arrive, but too late. + } driver.close(); } @@ -681,7 +686,7 @@ public class DocumentV1ApiTest { @Test public void testThroughput() throws InterruptedException { DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build(); - handler = new DocumentV1ApiHandler(clock, metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); + handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); int writers = 4; int queueFill = executorConfig.maxThrottled() - writers; @@ -742,7 +747,7 @@ public class DocumentV1ApiTest { }); } latch.await(); - System.err.println((System.nanoTime() - startNanos) * 1e-9 + " seconds total"); + System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertNull(failed.get()); driver.close(); |