summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java80
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java)36
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java11
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java7
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java40
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java42
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java33
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java16
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java10
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java7
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java110
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java33
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java19
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java33
16 files changed, 293 insertions, 193 deletions
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index a6119c5de4f..9d4f3525c32 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -8,6 +8,7 @@ import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.time.Duration;
@@ -57,18 +58,12 @@ public class CliClient {
return 0;
}
try (InputStream in = createFeedInputStream(cliArgs);
- JsonFeeder feeder = createJsonFeeder(cliArgs)) {
+ FeedClient feedClient = createFeedClient(cliArgs);
+ JsonFeeder feeder = createJsonFeeder(feedClient, cliArgs)) {
+ long startNanos = System.nanoTime();
+ feeder.feedMany(in).join();
if (cliArgs.benchmarkModeEnabled()) {
- BenchmarkResultAggregator aggregator = new BenchmarkResultAggregator();
- feeder.feedMany(in, aggregator).join();
- aggregator.printBenchmarkResult();
- } else {
- JsonFeeder.ResultCallback emptyCallback = new JsonFeeder.ResultCallback() {
- @Override public void onNextResult(Result result, Throwable error) {}
- @Override public void onError(Throwable error) {}
- @Override public void onComplete() {}
- };
- feeder.feedMany(in, emptyCallback).join();
+ printBenchmarkResult(System.nanoTime() - startNanos, feedClient.stats(), systemOut);
}
}
return 0;
@@ -94,8 +89,7 @@ public class CliClient {
return builder.build();
}
- private static JsonFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
- FeedClient feedClient = createFeedClient(cliArgs);
+ private static JsonFeeder createJsonFeeder(FeedClient feedClient, CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
JsonFeeder.Builder builder = JsonFeeder.builder(feedClient);
cliArgs.timeout().ifPresent(builder::withTimeout);
cliArgs.route().ifPresent(builder::withRoute);
@@ -129,42 +123,30 @@ public class CliClient {
@Override public boolean verify(String hostname, SSLSession session) { return true; }
}
- private class BenchmarkResultAggregator implements JsonFeeder.ResultCallback {
-
- private final AtomicInteger okCount = new AtomicInteger();
- private final AtomicInteger errorCount = new AtomicInteger();
- private volatile long endNanoTime;
- private volatile long startNanoTime;
-
- void start() { this.startNanoTime = System.nanoTime(); }
-
- void printBenchmarkResult() throws IOException {
- JsonFactory factory = new JsonFactory();
- Duration duration = Duration.ofNanos(endNanoTime - startNanoTime);
- int okCount = this.okCount.get();
- int errorCount = this.errorCount.get();
- double throughput = (double) okCount / duration.toMillis() * 1000D;
- try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
- generator.writeStartObject();
- generator.writeNumberField("feeder.runtime", duration.toMillis());
- generator.writeNumberField("feeder.okcount", okCount);
- generator.writeNumberField("feeder.errorcount", errorCount);
- generator.writeNumberField("feeder.throughput", throughput);
- generator.writeEndObject();
- }
- }
-
- @Override
- public void onNextResult(Result result, Throwable error) {
- if (error != null) {
- errorCount.incrementAndGet();
- } else {
- okCount.incrementAndGet();
- }
+ static void printBenchmarkResult(long durationNanos, OperationStats stats, OutputStream systemOut) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ long okCount = stats.successes();
+ long errorCount = stats.requests() - okCount;
+ double throughput = okCount * 1e6D / Math.max(1, durationNanos);
+ try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
+ generator.writeStartObject();
+ generator.writeNumberField("feeder.runtime", durationNanos / 1_000_000);
+ generator.writeNumberField("feeder.okcount", okCount);
+ generator.writeNumberField("feeder.errorcount", errorCount);
+ generator.writeNumberField("feeder.throughput", throughput);
+ generator.writeNumberField("feeder.minlatency", stats.minLatencyMillis());
+ generator.writeNumberField("feeder.avglatency", stats.averageLatencyMillis());
+ generator.writeNumberField("feeder.maxlatency", stats.maxLatencyMillis());
+ generator.writeNumberField("feeder.bytessent", stats.bytesSent());
+ generator.writeNumberField("feeder.bytesreceived", stats.bytesReceived());
+
+ generator.writeObjectFieldStart("feeder.responsecodes");
+ for (Map.Entry<Integer, Long> entry : stats.responsesByCode().entrySet())
+ generator.writeNumberField(Integer.toString(entry.getKey()), entry.getValue());
+ generator.writeEndObject();
+
+ generator.writeEndObject();
}
-
- @Override public void onError(Throwable error) {}
-
- @Override public void onComplete() { this.endNanoTime = System.nanoTime(); }
}
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
index 5099458f3fe..672f5f080b5 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
@@ -8,6 +8,7 @@ import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.net.URIAuthority;
@@ -29,18 +30,23 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
/**
* @author jonmv
*/
-class HttpCluster implements Cluster {
+class ApacheCluster implements Cluster {
private final List<Endpoint> endpoints = new ArrayList<>();
- public HttpCluster(FeedClientBuilder builder) throws IOException {
+ public ApacheCluster(FeedClientBuilder builder) throws IOException {
for (URI endpoint : builder.endpoints)
for (int i = 0; i < builder.connectionsPerEndpoint; i++)
endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
}
@Override
- public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) {
+ SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
+ wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
+ if (wrapped.body() != null)
+ request.setBody(wrapped.body(), ContentType.APPLICATION_JSON);
+
int index = 0;
int min = Integer.MAX_VALUE;
for (int i = 0; i < endpoints.size(); i++)
@@ -56,7 +62,7 @@ class HttpCluster implements Cluster {
request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort()));
endpoint.client.execute(request,
new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
@Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
@Override public void cancelled() { vessel.cancel(false); }
});
@@ -148,4 +154,26 @@ class HttpCluster implements Cluster {
return sslContextBuilder.build();
}
+
+ private static class ApacheHttpResponse implements HttpResponse {
+
+ private final SimpleHttpResponse wrapped;
+
+ private ApacheHttpResponse(SimpleHttpResponse wrapped) {
+ this.wrapped = wrapped;
+ }
+
+
+ @Override
+ public int code() {
+ return wrapped.getCode();
+ }
+
+ @Override
+ public byte[] body() {
+ return wrapped.getBodyBytes();
+ }
+
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
index 1ae8ae1d490..0e9bfe0ef46 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
@@ -1,9 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -41,7 +38,7 @@ public class BenchmarkingCluster implements Cluster {
}
@Override
- public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
requests.incrementAndGet();
long startMillis = System.currentTimeMillis();
delegate.dispatch(request, vessel);
@@ -49,13 +46,13 @@ public class BenchmarkingCluster implements Cluster {
results++;
if (thrown == null) {
responses++;
- responsesByCode[response.getCode()]++;
+ responsesByCode[response.code()]++;
long latency = System.currentTimeMillis() - startMillis;
totalLatencyMillis += latency;
minLatencyMillis = Math.min(minLatencyMillis, latency);
maxLatencyMillis = Math.max(maxLatencyMillis, latency);
- bytesSent += request.getBodyBytes() == null ? 0 : request.getBodyBytes().length;
- bytesReceived += response.getBodyBytes() == null ? 0 : response.getBodyBytes().length;
+ bytesSent += request.body() == null ? 0 : request.body().length;
+ bytesReceived += response.body() == null ? 0 : response.body().length;
}
else
exceptions++;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
index bcf1c4ae107..f428fb567e6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
@@ -1,9 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-
import java.io.Closeable;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@@ -14,7 +11,7 @@ import java.util.concurrent.CompletableFuture;
interface Cluster extends Closeable {
/** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */
- void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel);
+ void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel);
@Override
default void close() { }
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
index eb31d1aa808..25068818396 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
@@ -5,4 +5,11 @@ package ai.vespa.feed.client;
* @author bjorncs
*/
public class FeedException extends RuntimeException {
+
+ public FeedException(String message) { super(message); }
+
+ public FeedException(String message, Throwable cause) { super(message, cause); }
+
+ public FeedException(Throwable cause) { super(cause); }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 6e7e20ae121..c572f84db54 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -4,17 +4,14 @@ package ai.vespa.feed.client;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.net.URIBuilder;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -23,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
/**
@@ -89,37 +87,29 @@ class HttpFeedClient implements FeedClient {
ensureOpen();
String path = operationPath(documentId, params).toString();
- SimpleHttpRequest request = new SimpleHttpRequest(method, path);
- requestHeaders.forEach((name, value) -> request.setHeader(name, value.get()));
- if (operationJson != null)
- request.setBody(operationJson, ContentType.APPLICATION_JSON);
+ HttpRequest request = new HttpRequest(method, path, requestHeaders, operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way?
return requestStrategy.enqueue(documentId, request)
- .handle((response, thrown) -> {
- if (thrown != null) {
- // TODO: What to do with exceptions here? Ex on 400, 401, 403, etc, and wrap and throw?
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- thrown.printStackTrace(new PrintStream(buffer));
- return new Result(Result.Type.failure, documentId, buffer.toString(), null);
- }
- return toResult(response, documentId);
- });
+ .thenApply(response -> toResult(request, response, documentId));
}
- static Result toResult(SimpleHttpResponse response, DocumentId documentId) {
+ static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) {
Result.Type type;
- switch (response.getCode()) {
+ switch (response.code()) {
case 200: type = Result.Type.success; break;
case 412: type = Result.Type.conditionNotMet; break;
- default: type = Result.Type.failure;
+ case 502:
+ case 504:
+ case 507: type = Result.Type.failure; break;
+ default: type = null;
}
String message = null;
String trace = null;
try {
- JsonParser parser = factory.createParser(response.getBodyText());
+ JsonParser parser = factory.createParser(response.body());
if (parser.nextToken() != JsonToken.START_OBJECT)
- throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText());
+ throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8));
String name;
while ((name = parser.nextFieldName()) != null) {
@@ -131,12 +121,16 @@ class HttpFeedClient implements FeedClient {
}
if (parser.currentToken() != JsonToken.END_OBJECT)
- throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText());
+ throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
+ if (type == null) // Not a Vespa response, but a failure in the HTTP layer.
+ throw new FeedException("Status " + response.code() + " executing '" + request +
+ "': " + (message == null ? new String(response.body(), UTF_8) : message));
+
return new Result(type, documentId, message, trace);
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
new file mode 100644
index 00000000000..8da2f46def2
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
@@ -0,0 +1,42 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+class HttpRequest {
+
+ private final String method;
+ private final String path;
+ private final Map<String, Supplier<String>> headers;
+ private final byte[] body;
+
+ public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) {
+ this.method = method;
+ this.path = path;
+ this.headers = headers;
+ this.body = body;
+ }
+
+ public String method() {
+ return method;
+ }
+
+ public String path() {
+ return path;
+ }
+
+ public Map<String, Supplier<String>> headers() {
+ return headers;
+ }
+
+ public byte[] body() {
+ return body;
+ }
+
+ @Override
+ public String toString() {
+ return method + " " + path;
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 408488cbaec..5646d37cde3 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -3,8 +3,6 @@ package ai.vespa.feed.client;
import ai.vespa.feed.client.FeedClient.CircuitBreaker;
import ai.vespa.feed.client.FeedClient.RetryStrategy;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import java.io.IOException;
import java.util.Map;
@@ -23,6 +21,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.lang.Math.max;
import static java.lang.Math.min;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
@@ -60,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy {
});
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
- this(builder, new BenchmarkingCluster(new HttpCluster(builder)));
+ this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
}
HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
@@ -112,15 +111,15 @@ class HttpRequestStrategy implements RequestStrategy {
return inflight.get() - delayedCount.get() > targetInflight();
}
- private boolean retry(SimpleHttpRequest request, int attempt) {
+ private boolean retry(HttpRequest request, int attempt) {
if (attempt > strategy.retries())
return false;
- switch (request.getMethod().toUpperCase()) {
+ switch (request.method().toUpperCase()) {
case "POST": return strategy.retry(FeedClient.OperationType.PUT);
case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE);
case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE);
- default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod());
+ default: throw new IllegalStateException("Unexpected HTTP method: " + request.method());
}
}
@@ -128,7 +127,7 @@ class HttpRequestStrategy implements RequestStrategy {
* Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
* or the user has turned off retries for this type of operation.
*/
- private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
+ private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
breaker.failure();
log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
@@ -151,23 +150,23 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */
- private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) {
- if (response.getCode() / 100 == 2) {
+ private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
+ if (response.code() / 100 == 2) {
breaker.success();
incrementTargetInflight();
return false;
}
- log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() +
+ log.log(FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), UTF_8) +
") on attempt " + attempt + " at " + request);
- if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight.
+ if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight.
decreaseTargetInflight();
return true;
}
breaker.failure();
- if (response.getCode() == 500 || response.getCode() == 502 || response.getCode() == 504) // Hopefully temporary errors.
+ if (response.code() == 500 || response.code() == 502 || response.code() == 504) // Hopefully temporary errors.
return retry(request, attempt);
return false;
@@ -205,9 +204,9 @@ class HttpRequestStrategy implements RequestStrategy {
}
@Override
- public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) {
- CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
- CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
+ public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
+ CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
+ CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
CompletableFuture<?> previous = inflightById.put(documentId, result);
if (destroyed.get()) {
result.cancel(true);
@@ -232,14 +231,14 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) {
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) {
vessel.whenCompleteAsync((response, thrown) -> {
// Retry the operation if it failed with a transient error ...
if (thrown != null ? retry(request, thrown, attempt)
: retry(request, response, attempt)) {
retries.incrementAndGet();
CircuitBreaker.State state = breaker.state();
- CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
+ CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
offer(() -> cluster.dispatch(request, retry));
handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
new file mode 100644
index 00000000000..b1dd54240eb
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
@@ -0,0 +1,16 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+interface HttpResponse {
+
+ int code();
+ byte[] body();
+
+ static HttpResponse of(int code, byte[] body) {
+ return new HttpResponse() {
+ @Override public int code() { return code; }
+ @Override public byte[] body() { return body; }
+ };
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 2a6d2e15747..de32e7abdf5 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -58,18 +58,18 @@ public class JsonFeeder implements Closeable {
* @param result Non-null if operation completed successfully
* @param error Non-null if operation failed
*/
- void onNextResult(Result result, Throwable error);
+ default void onNextResult(Result result, Throwable error) { }
/**
* Invoked if an unrecoverable error occurred during feed processing,
* after which no other {@link ResultCallback} methods are invoked.
*/
- void onError(Throwable error);
+ default void onError(Throwable error) { }
/**
* Invoked when all feed operations are either completed successfully or failed.
*/
- void onComplete();
+ default void onComplete() { }
}
public static Builder builder(FeedClient client) { return new Builder(client); }
@@ -103,6 +103,10 @@ public class JsonFeeder implements Closeable {
return feedMany(jsonStream, 1 << 26, resultCallback);
}
+ public CompletableFuture<Void> feedMany(InputStream jsonStream) {
+ return feedMany(jsonStream, new ResultCallback() { });
+ }
+
CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
RingBufferStream buffer = new RingBufferStream(jsonStream, size);
CompletableFuture<Void> overallResult = new CompletableFuture<>();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
index 9305709d873..d36475a51fb 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
@@ -42,6 +42,10 @@ public class OperationStats {
return requests - inflight;
}
+ public long successes() {
+ return responsesByCode.getOrDefault(200, 0L);
+ }
+
public Map<Integer, Long> responsesByCode() {
return responsesByCode;
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index bc2707bc490..7764dce712b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -1,12 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-
-import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
/**
* Controls execution of feed operations.
@@ -28,6 +23,6 @@ interface RequestStrategy {
void await();
/** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */
- CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request);
+ CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
index 65fbcb12204..0605ed5bf65 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
@@ -1,20 +1,18 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.core5.http.ContentType;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.function.BiConsumer;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* @author jonmv
@@ -22,50 +20,82 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
class HttpFeedClientTest {
@Test
- void testRequestGeneration() throws IOException, ExecutionException, InterruptedException {
+ void testFeeding() throws ExecutionException, InterruptedException {
DocumentId id = DocumentId.of("ns", "type", "0");
+ AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@Override public OperationStats stats() { throw new UnsupportedOperationException(); }
@Override public boolean hasFailed() { return false; }
@Override public void destroy() { throw new UnsupportedOperationException(); }
@Override public void await() { throw new UnsupportedOperationException(); }
- @Override public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) {
- try {
- assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route",
- request.getUri().toString());
- assertEquals("json", request.getBodyText());
+ @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
+ }
+ FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy());
- SimpleHttpResponse response = new SimpleHttpResponse(502);
- response.setBody("{\n" +
- " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
- " \"id\": \"id:ns:type::0\",\n" +
- " \"message\": \"Ooops! ... I did it again.\",\n" +
- " \"trace\": \"I played with your heart. Got lost in the game.\"\n" +
- "}",
- ContentType.APPLICATION_JSON);
- return CompletableFuture.completedFuture(response);
- }
- catch (Throwable thrown) {
- CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>();
- failed.completeExceptionally(thrown);
- return failed;
- }
- }
+ // Vespa error is an error result.
+ dispatch.set((documentId, request) -> {
+ try {
+ assertEquals(id, documentId);
+ assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route",
+ request.path());
+ assertEquals("json", new String(request.body(), UTF_8));
- }
- Result result = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")),
- new MockRequestStrategy())
- .put(id,
- "json",
- OperationParameters.empty()
- .createIfNonExistent(true)
- .testAndSetCondition("false")
- .route("route")
- .timeout(Duration.ofSeconds(5)))
- .get();
+ HttpResponse response = HttpResponse.of(502,
+ ("{\n" +
+ " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
+ " \"id\": \"id:ns:type::0\",\n" +
+ " \"message\": \"Ooops! ... I did it again.\",\n" +
+ " \"trace\": \"I played with your heart. Got lost in the game.\"\n" +
+ "}").getBytes(UTF_8));
+ return CompletableFuture.completedFuture(response);
+ }
+ catch (Throwable thrown) {
+ CompletableFuture<HttpResponse> failed = new CompletableFuture<>();
+ failed.completeExceptionally(thrown);
+ return failed;
+ }
+ });
+ Result result = client.put(id,
+ "json",
+ OperationParameters.empty()
+ .createIfNonExistent(true)
+ .testAndSetCondition("false")
+ .route("route")
+ .timeout(Duration.ofSeconds(5)))
+ .get();
assertEquals("Ooops! ... I did it again.", result.resultMessage().get());
assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get());
+
+
+ // Handler error is a FeedException.
+ dispatch.set((documentId, request) -> {
+ try {
+ assertEquals(id, documentId);
+ assertEquals("/document/v1/ns/type/docid/0",
+ request.path());
+ assertEquals("json", new String(request.body(), UTF_8));
+
+ HttpResponse response = HttpResponse.of(500,
+ ("{\n" +
+ " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
+ " \"id\": \"id:ns:type::0\",\n" +
+ " \"message\": \"Alla ska i jorden.\",\n" +
+ " \"trace\": \"Din tid den kom, och senn så for den. \"\n" +
+ "}").getBytes(UTF_8));
+ return CompletableFuture.completedFuture(response);
+ }
+ catch (Throwable thrown) {
+ CompletableFuture<HttpResponse> failed = new CompletableFuture<>();
+ failed.completeExceptionally(thrown);
+ return failed;
+ }
+ });
+ ExecutionException expected = assertThrows(ExecutionException.class,
+ () -> client.put(id,
+ "json",
+ OperationParameters.empty())
+ .get());
+ assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage());
}
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 7411f4124e5..d3005227184 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -2,8 +2,6 @@
package ai.vespa.feed.client;
import ai.vespa.feed.client.FeedClient.CircuitBreaker;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.jupiter.api.Test;
@@ -35,9 +33,8 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() {
int documents = 1 << 16;
- SimpleHttpRequest request = new SimpleHttpRequest("PUT", "/");
- SimpleHttpResponse response = new SimpleHttpResponse(200);
- response.setBody("{}".getBytes(UTF_8), ContentType.APPLICATION_JSON);
+ HttpRequest request = new HttpRequest("PUT", "/", null, null);
+ HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), 100, TimeUnit.MILLISECONDS));
@@ -84,7 +81,7 @@ class HttpRequestStrategyTest {
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- SimpleHttpRequest request = new SimpleHttpRequest("POST", "/");
+ HttpRequest request = new HttpRequest("POST", "/", null, null);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
@@ -101,17 +98,17 @@ class HttpRequestStrategyTest {
assertEquals(3, strategy.stats().requests());
// Successful response is returned
- SimpleHttpResponse success = new SimpleHttpResponse(200);
+ HttpResponse success = HttpResponse.of(200, null);
cluster.expect((__, vessel) -> vessel.complete(success));
assertEquals(success, strategy.enqueue(id1, request).get());
assertEquals(4, strategy.stats().requests());
// Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
now.set(2000);
- SimpleHttpResponse throttled = new SimpleHttpResponse(429);
+ HttpResponse throttled = HttpResponse.of(429, null);
AtomicInteger count = new AtomicInteger(3);
CountDownLatch latch = new CountDownLatch(1);
- AtomicReference<CompletableFuture<SimpleHttpResponse>> completion = new AtomicReference<>();
+ AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>();
cluster.expect((req, vessel) -> {
if (req == request) {
if (count.decrementAndGet() > 0)
@@ -123,9 +120,9 @@ class HttpRequestStrategyTest {
}
else vessel.complete(success);
});
- CompletableFuture<SimpleHttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<SimpleHttpResponse> serialised = strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/"));
- assertEquals(success, strategy.enqueue(id2, new SimpleHttpRequest("DELETE", "/")).get());
+ CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
now.set(4000);
@@ -135,7 +132,7 @@ class HttpRequestStrategyTest {
assertEquals(success, serialised.get());
// Some error responses are retried.
- SimpleHttpResponse serverError = new SimpleHttpResponse(500);
+ HttpResponse serverError = HttpResponse.of(500, null);
cluster.expect((__, vessel) -> vessel.complete(serverError));
assertEquals(serverError, strategy.enqueue(id1, request).get());
assertEquals(11, strategy.stats().requests());
@@ -143,11 +140,11 @@ class HttpRequestStrategyTest {
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
- SimpleHttpResponse badRequest = new SimpleHttpResponse(400);
+ HttpResponse badRequest = HttpResponse.of(400, null);
cluster.expect((__, vessel) -> vessel.complete(badRequest));
assertEquals(badRequest, strategy.enqueue(id1, request).get());
assertEquals(13, strategy.stats().requests());
@@ -169,14 +166,14 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
- final AtomicReference<BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>();
+ final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
- void expect(BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> expected) {
+ void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
}
@Override
- public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
dispatch.get().accept(request, vessel);
}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 789a2ab537f..9db296e33cd 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -122,7 +122,7 @@ import static java.util.stream.Collectors.toUnmodifiableMap;
*/
public class DocumentV1ApiHandler extends AbstractRequestHandler {
- private static final Duration defaultTimeout = Duration.ofSeconds(175);
+ private static final Duration defaultTimeout = Duration.ofSeconds(180); // Match document API default timeout.
private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName());
private static final Parser<Integer> integerParser = Integer::parseInt;
@@ -160,6 +160,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String TRACELEVEL = "tracelevel";
private final Clock clock;
+ private final Duration handlerTimeout;
private final Metric metric;
private final DocumentApiMetrics metrics;
private final DocumentOperationParser parser;
@@ -184,14 +185,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
ClusterListConfig clusterListConfig,
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
- this(Clock.systemUTC(), metric, metricReceiver, documentAccess,
+ this(Clock.systemUTC(), Duration.ofSeconds(5), metric, metricReceiver, documentAccess,
documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig);
}
- DocumentV1ApiHandler(Clock clock, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
+ DocumentV1ApiHandler(Clock clock, Duration handlerTimeout, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) {
this.clock = clock;
+ this.handlerTimeout = handlerTimeout;
this.parser = new DocumentOperationParser(documentmanagerConfig);
this.metric = metric;
this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
@@ -222,8 +224,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
HttpRequest request = (HttpRequest) rawRequest;
try {
- request.setTimeout(getProperty(request, TIMEOUT, timeoutMillisParser)
- .orElse(defaultTimeout.toMillis()),
+ // Set a higher HTTP layer timeout than the document API timeout, to prefer triggering the latter.
+ request.setTimeout( getProperty(request, TIMEOUT, timeoutMillisParser).orElse(defaultTimeout.toMillis())
+ + handlerTimeout.toMillis(),
TimeUnit.MILLISECONDS);
Path requestPath = new Path(request.getUri());
@@ -251,7 +254,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override
public void handleTimeout(Request request, ResponseHandler responseHandler) {
- timeout((HttpRequest) request, "Request timeout after " + request.getTimeout(TimeUnit.MILLISECONDS) + "ms", responseHandler);
+ timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler);
}
@Override
@@ -970,7 +973,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
parameters.setMaxTotalHits(wantedDocumentCount);
parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency));
parameters.visitInconsistentBuckets(true);
- parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - 5000));
+ parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
return parameters;
}
@@ -980,7 +983,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER)));
parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1));
long timeChunk = getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L);
- parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - 5000L)));
+ parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())));
return parameters;
}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 49fa15849ce..29ae7f52265 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -60,6 +60,7 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -133,7 +134,7 @@ public class DocumentV1ApiTest {
access = new MockDocumentAccess(docConfig);
metric = new NullMetric();
metrics = new MetricReceiver.MockReceiver();
- handler = new DocumentV1ApiHandler(clock, metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
}
@After
@@ -179,7 +180,7 @@ public class DocumentV1ApiTest {
}
@Test
- public void testResponses() {
+ public void testResponses() throws InterruptedException {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null));
// GET at non-existent path returns 404 with available paths
@@ -207,7 +208,7 @@ public class DocumentV1ApiTest {
assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
assertEquals("[id]", parameters.getFieldSet());
assertEquals("(all the things)", parameters.getDocumentSelection());
- assertEquals(1000, parameters.getSessionTimeoutMs());
+ assertEquals(6000, parameters.getSessionTimeoutMs());
// Put some documents in the response
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
@@ -272,7 +273,7 @@ public class DocumentV1ApiTest {
access.expect(parameters -> {
assertEquals("[Content:cluster=content]", parameters.getRemoteDataHandler());
assertEquals("[all]", parameters.fieldSet());
- assertEquals(55_000L, parameters.getSessionTimeoutMs());
+ assertEquals(60_000L, parameters.getSessionTimeoutMs());
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "We made it!");
});
response = driver.sendRequest("http://localhost/document/v1/space/music/docid?destinationCluster=content&selection=true&cluster=content&timeout=60", POST);
@@ -666,14 +667,18 @@ public class DocumentV1ApiTest {
handler.set(parameters.responseHandler().get());
return new Result(Result.ResultType.SUCCESS, null);
});
- var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?timeout=1ms");
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/docid/one\"," +
- " \"message\": \"Request timeout after 1ms\"" +
- "}", response4.readAll());
- assertEquals(504, response4.getStatus());
- if (handler.get() != null) // Timeout may have occurred before dispatch, or ...
- handler.get().handleResponse(new Response(0)); // response may eventually arrive, but too late.
+ try {
+ var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?timeout=1ms");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid/one\"," +
+ " \"message\": \"Timeout after 1ms\"" +
+ "}", response4.readAll());
+ assertEquals(504, response4.getStatus());
+ }
+ finally {
+ if (handler.get() != null) // Timeout may have occurred before dispatch, or ...
+ handler.get().handleResponse(new Response(0)); // response may eventually arrive, but too late.
+ }
driver.close();
}
@@ -681,7 +686,7 @@ public class DocumentV1ApiTest {
@Test
public void testThroughput() throws InterruptedException {
DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder().build();
- handler = new DocumentV1ApiHandler(clock, metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
+ handler = new DocumentV1ApiHandler(clock, Duration.ofMillis(1), metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
int writers = 4;
int queueFill = executorConfig.maxThrottled() - writers;
@@ -742,7 +747,7 @@ public class DocumentV1ApiTest {
});
}
latch.await();
- System.err.println((System.nanoTime() - startNanos) * 1e-9 + " seconds total");
+ System.err.println(docs + " requests in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
assertNull(failed.get());
driver.close();