aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-10 12:09:48 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-10 12:09:48 +0200
commit54da1662cb6891c88db864b02eb05dbfd7d94c14 (patch)
tree531b9957691c310c6c1aae6d5aa38ee44183ffc7
parentbecde905f311604fb3cefa53011618cd89fbad7e (diff)
Hide client library behind HttpRequest and HttpResponse
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java)36
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java11
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java23
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java41
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java33
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java16
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java7
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java53
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java33
10 files changed, 158 insertions, 100 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
index 5099458f3fe..672f5f080b5 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
@@ -8,6 +8,7 @@ import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.net.URIAuthority;
@@ -29,18 +30,23 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
/**
* @author jonmv
*/
-class HttpCluster implements Cluster {
+class ApacheCluster implements Cluster {
private final List<Endpoint> endpoints = new ArrayList<>();
- public HttpCluster(FeedClientBuilder builder) throws IOException {
+ public ApacheCluster(FeedClientBuilder builder) throws IOException {
for (URI endpoint : builder.endpoints)
for (int i = 0; i < builder.connectionsPerEndpoint; i++)
endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
}
@Override
- public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) {
+ SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
+ wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
+ if (wrapped.body() != null)
+ request.setBody(wrapped.body(), ContentType.APPLICATION_JSON);
+
int index = 0;
int min = Integer.MAX_VALUE;
for (int i = 0; i < endpoints.size(); i++)
@@ -56,7 +62,7 @@ class HttpCluster implements Cluster {
request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort()));
endpoint.client.execute(request,
new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
@Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
@Override public void cancelled() { vessel.cancel(false); }
});
@@ -148,4 +154,26 @@ class HttpCluster implements Cluster {
return sslContextBuilder.build();
}
+
+ private static class ApacheHttpResponse implements HttpResponse {
+
+ private final SimpleHttpResponse wrapped;
+
+ private ApacheHttpResponse(SimpleHttpResponse wrapped) {
+ this.wrapped = wrapped;
+ }
+
+
+ @Override
+ public int code() {
+ return wrapped.getCode();
+ }
+
+ @Override
+ public byte[] body() {
+ return wrapped.getBodyBytes();
+ }
+
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
index 1ae8ae1d490..0e9bfe0ef46 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
@@ -1,9 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -41,7 +38,7 @@ public class BenchmarkingCluster implements Cluster {
}
@Override
- public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
requests.incrementAndGet();
long startMillis = System.currentTimeMillis();
delegate.dispatch(request, vessel);
@@ -49,13 +46,13 @@ public class BenchmarkingCluster implements Cluster {
results++;
if (thrown == null) {
responses++;
- responsesByCode[response.getCode()]++;
+ responsesByCode[response.code()]++;
long latency = System.currentTimeMillis() - startMillis;
totalLatencyMillis += latency;
minLatencyMillis = Math.min(minLatencyMillis, latency);
maxLatencyMillis = Math.max(maxLatencyMillis, latency);
- bytesSent += request.getBodyBytes() == null ? 0 : request.getBodyBytes().length;
- bytesReceived += response.getBodyBytes() == null ? 0 : response.getBodyBytes().length;
+ bytesSent += request.body() == null ? 0 : request.body().length;
+ bytesReceived += response.body() == null ? 0 : response.body().length;
}
else
exceptions++;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
index bcf1c4ae107..f428fb567e6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
@@ -1,9 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-
import java.io.Closeable;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
@@ -14,7 +11,7 @@ import java.util.concurrent.CompletableFuture;
interface Cluster extends Closeable {
/** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */
- void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel);
+ void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel);
@Override
default void close() { }
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 82f74170230..c572f84db54 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -4,8 +4,6 @@ package ai.vespa.feed.client;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.net.URIBuilder;
@@ -13,6 +11,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -21,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
/**
@@ -87,18 +87,15 @@ class HttpFeedClient implements FeedClient {
ensureOpen();
String path = operationPath(documentId, params).toString();
- SimpleHttpRequest request = new SimpleHttpRequest(method, path);
- requestHeaders.forEach((name, value) -> request.setHeader(name, value.get()));
- if (operationJson != null)
- request.setBody(operationJson, ContentType.APPLICATION_JSON);
+ HttpRequest request = new HttpRequest(method, path, requestHeaders, operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way?
return requestStrategy.enqueue(documentId, request)
.thenApply(response -> toResult(request, response, documentId));
}
- static Result toResult(SimpleHttpRequest request, SimpleHttpResponse response, DocumentId documentId) {
+ static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) {
Result.Type type;
- switch (response.getCode()) {
+ switch (response.code()) {
case 200: type = Result.Type.success; break;
case 412: type = Result.Type.conditionNotMet; break;
case 502:
@@ -110,9 +107,9 @@ class HttpFeedClient implements FeedClient {
String message = null;
String trace = null;
try {
- JsonParser parser = factory.createParser(response.getBodyText());
+ JsonParser parser = factory.createParser(response.body());
if (parser.nextToken() != JsonToken.START_OBJECT)
- throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText());
+ throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8));
String name;
while ((name = parser.nextFieldName()) != null) {
@@ -124,15 +121,15 @@ class HttpFeedClient implements FeedClient {
}
if (parser.currentToken() != JsonToken.END_OBJECT)
- throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText());
+ throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
if (type == null) // Not a Vespa response, but a failure in the HTTP layer.
- throw new FeedException("Status " + response.getCode() + " executing '" + request +
- "': " + (message == null ? response.getBodyText() : message));
+ throw new FeedException("Status " + response.code() + " executing '" + request +
+ "': " + (message == null ? new String(response.body(), UTF_8) : message));
return new Result(type, documentId, message, trace);
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
index c3fb37fe4df..8da2f46def2 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
@@ -1,3 +1,42 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;public class HttpRequest {
+package ai.vespa.feed.client;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+class HttpRequest {
+
+ private final String method;
+ private final String path;
+ private final Map<String, Supplier<String>> headers;
+ private final byte[] body;
+
+ public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) {
+ this.method = method;
+ this.path = path;
+ this.headers = headers;
+ this.body = body;
+ }
+
+ public String method() {
+ return method;
+ }
+
+ public String path() {
+ return path;
+ }
+
+ public Map<String, Supplier<String>> headers() {
+ return headers;
+ }
+
+ public byte[] body() {
+ return body;
+ }
+
+ @Override
+ public String toString() {
+ return method + " " + path;
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 408488cbaec..5646d37cde3 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -3,8 +3,6 @@ package ai.vespa.feed.client;
import ai.vespa.feed.client.FeedClient.CircuitBreaker;
import ai.vespa.feed.client.FeedClient.RetryStrategy;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import java.io.IOException;
import java.util.Map;
@@ -23,6 +21,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.lang.Math.max;
import static java.lang.Math.min;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
@@ -60,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy {
});
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
- this(builder, new BenchmarkingCluster(new HttpCluster(builder)));
+ this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
}
HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
@@ -112,15 +111,15 @@ class HttpRequestStrategy implements RequestStrategy {
return inflight.get() - delayedCount.get() > targetInflight();
}
- private boolean retry(SimpleHttpRequest request, int attempt) {
+ private boolean retry(HttpRequest request, int attempt) {
if (attempt > strategy.retries())
return false;
- switch (request.getMethod().toUpperCase()) {
+ switch (request.method().toUpperCase()) {
case "POST": return strategy.retry(FeedClient.OperationType.PUT);
case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE);
case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE);
- default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod());
+ default: throw new IllegalStateException("Unexpected HTTP method: " + request.method());
}
}
@@ -128,7 +127,7 @@ class HttpRequestStrategy implements RequestStrategy {
* Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
* or the user has turned off retries for this type of operation.
*/
- private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
+ private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
breaker.failure();
log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
@@ -151,23 +150,23 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */
- private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) {
- if (response.getCode() / 100 == 2) {
+ private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
+ if (response.code() / 100 == 2) {
breaker.success();
incrementTargetInflight();
return false;
}
- log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() +
+ log.log(FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), UTF_8) +
") on attempt " + attempt + " at " + request);
- if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight.
+ if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight.
decreaseTargetInflight();
return true;
}
breaker.failure();
- if (response.getCode() == 500 || response.getCode() == 502 || response.getCode() == 504) // Hopefully temporary errors.
+ if (response.code() == 500 || response.code() == 502 || response.code() == 504) // Hopefully temporary errors.
return retry(request, attempt);
return false;
@@ -205,9 +204,9 @@ class HttpRequestStrategy implements RequestStrategy {
}
@Override
- public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) {
- CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
- CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
+ public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
+ CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
+ CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
CompletableFuture<?> previous = inflightById.put(documentId, result);
if (destroyed.get()) {
result.cancel(true);
@@ -232,14 +231,14 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) {
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) {
vessel.whenCompleteAsync((response, thrown) -> {
// Retry the operation if it failed with a transient error ...
if (thrown != null ? retry(request, thrown, attempt)
: retry(request, response, attempt)) {
retries.incrementAndGet();
CircuitBreaker.State state = breaker.state();
- CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
+ CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
offer(() -> cluster.dispatch(request, retry));
handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
new file mode 100644
index 00000000000..b1dd54240eb
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
@@ -0,0 +1,16 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+interface HttpResponse {
+
+ int code();
+ byte[] body();
+
+ static HttpResponse of(int code, byte[] body) {
+ return new HttpResponse() {
+ @Override public int code() { return code; }
+ @Override public byte[] body() { return body; }
+ };
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index bc2707bc490..7764dce712b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -1,12 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-
-import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
/**
* Controls execution of feed operations.
@@ -28,6 +23,6 @@ interface RequestStrategy {
void await();
/** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */
- CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request);
+ CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
index 6155d8bf3b6..0605ed5bf65 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
@@ -1,21 +1,16 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.core5.http.ContentType;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
import java.util.function.BiFunction;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -27,13 +22,13 @@ class HttpFeedClientTest {
@Test
void testFeeding() throws ExecutionException, InterruptedException {
DocumentId id = DocumentId.of("ns", "type", "0");
- AtomicReference<BiFunction<DocumentId, SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>();
+ AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@Override public OperationStats stats() { throw new UnsupportedOperationException(); }
@Override public boolean hasFailed() { return false; }
@Override public void destroy() { throw new UnsupportedOperationException(); }
@Override public void await() { throw new UnsupportedOperationException(); }
- @Override public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) { return dispatch.get().apply(documentId, request); }
+ @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
}
FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy());
@@ -42,21 +37,20 @@ class HttpFeedClientTest {
try {
assertEquals(id, documentId);
assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route",
- request.getUri().toString());
- assertEquals("json", request.getBodyText());
+ request.path());
+ assertEquals("json", new String(request.body(), UTF_8));
- SimpleHttpResponse response = new SimpleHttpResponse(502);
- response.setBody("{\n" +
- " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
- " \"id\": \"id:ns:type::0\",\n" +
- " \"message\": \"Ooops! ... I did it again.\",\n" +
- " \"trace\": \"I played with your heart. Got lost in the game.\"\n" +
- "}",
- ContentType.APPLICATION_JSON);
+ HttpResponse response = HttpResponse.of(502,
+ ("{\n" +
+ " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
+ " \"id\": \"id:ns:type::0\",\n" +
+ " \"message\": \"Ooops! ... I did it again.\",\n" +
+ " \"trace\": \"I played with your heart. Got lost in the game.\"\n" +
+ "}").getBytes(UTF_8));
return CompletableFuture.completedFuture(response);
}
catch (Throwable thrown) {
- CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>();
+ CompletableFuture<HttpResponse> failed = new CompletableFuture<>();
failed.completeExceptionally(thrown);
return failed;
}
@@ -78,21 +72,20 @@ class HttpFeedClientTest {
try {
assertEquals(id, documentId);
assertEquals("/document/v1/ns/type/docid/0",
- request.getUri().toString());
- assertEquals("json", request.getBodyText());
+ request.path());
+ assertEquals("json", new String(request.body(), UTF_8));
- SimpleHttpResponse response = new SimpleHttpResponse(500);
- response.setBody("{\n" +
- " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
- " \"id\": \"id:ns:type::0\",\n" +
- " \"message\": \"Alla ska i jorden.\",\n" +
- " \"trace\": \"Din tid den kom, och senn så for den. \"\n" +
- "}",
- ContentType.APPLICATION_JSON);
+ HttpResponse response = HttpResponse.of(500,
+ ("{\n" +
+ " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
+ " \"id\": \"id:ns:type::0\",\n" +
+ " \"message\": \"Alla ska i jorden.\",\n" +
+ " \"trace\": \"Din tid den kom, och senn så for den. \"\n" +
+ "}").getBytes(UTF_8));
return CompletableFuture.completedFuture(response);
}
catch (Throwable thrown) {
- CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>();
+ CompletableFuture<HttpResponse> failed = new CompletableFuture<>();
failed.completeExceptionally(thrown);
return failed;
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 7411f4124e5..d3005227184 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -2,8 +2,6 @@
package ai.vespa.feed.client;
import ai.vespa.feed.client.FeedClient.CircuitBreaker;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.jupiter.api.Test;
@@ -35,9 +33,8 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() {
int documents = 1 << 16;
- SimpleHttpRequest request = new SimpleHttpRequest("PUT", "/");
- SimpleHttpResponse response = new SimpleHttpResponse(200);
- response.setBody("{}".getBytes(UTF_8), ContentType.APPLICATION_JSON);
+ HttpRequest request = new HttpRequest("PUT", "/", null, null);
+ HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), 100, TimeUnit.MILLISECONDS));
@@ -84,7 +81,7 @@ class HttpRequestStrategyTest {
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- SimpleHttpRequest request = new SimpleHttpRequest("POST", "/");
+ HttpRequest request = new HttpRequest("POST", "/", null, null);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
@@ -101,17 +98,17 @@ class HttpRequestStrategyTest {
assertEquals(3, strategy.stats().requests());
// Successful response is returned
- SimpleHttpResponse success = new SimpleHttpResponse(200);
+ HttpResponse success = HttpResponse.of(200, null);
cluster.expect((__, vessel) -> vessel.complete(success));
assertEquals(success, strategy.enqueue(id1, request).get());
assertEquals(4, strategy.stats().requests());
// Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
now.set(2000);
- SimpleHttpResponse throttled = new SimpleHttpResponse(429);
+ HttpResponse throttled = HttpResponse.of(429, null);
AtomicInteger count = new AtomicInteger(3);
CountDownLatch latch = new CountDownLatch(1);
- AtomicReference<CompletableFuture<SimpleHttpResponse>> completion = new AtomicReference<>();
+ AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>();
cluster.expect((req, vessel) -> {
if (req == request) {
if (count.decrementAndGet() > 0)
@@ -123,9 +120,9 @@ class HttpRequestStrategyTest {
}
else vessel.complete(success);
});
- CompletableFuture<SimpleHttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<SimpleHttpResponse> serialised = strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/"));
- assertEquals(success, strategy.enqueue(id2, new SimpleHttpRequest("DELETE", "/")).get());
+ CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
now.set(4000);
@@ -135,7 +132,7 @@ class HttpRequestStrategyTest {
assertEquals(success, serialised.get());
// Some error responses are retried.
- SimpleHttpResponse serverError = new SimpleHttpResponse(500);
+ HttpResponse serverError = HttpResponse.of(500, null);
cluster.expect((__, vessel) -> vessel.complete(serverError));
assertEquals(serverError, strategy.enqueue(id1, request).get());
assertEquals(11, strategy.stats().requests());
@@ -143,11 +140,11 @@ class HttpRequestStrategyTest {
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
- SimpleHttpResponse badRequest = new SimpleHttpResponse(400);
+ HttpResponse badRequest = HttpResponse.of(400, null);
cluster.expect((__, vessel) -> vessel.complete(badRequest));
assertEquals(badRequest, strategy.enqueue(id1, request).get());
assertEquals(13, strategy.stats().requests());
@@ -169,14 +166,14 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
- final AtomicReference<BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>();
+ final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
- void expect(BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> expected) {
+ void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
}
@Override
- public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
dispatch.get().accept(request, vessel);
}