aboutsummaryrefslogtreecommitdiffstats
path: root/configserver-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-04-26 16:41:58 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-04-26 16:41:58 +0200
commitde80724b756e8f3e617755e838189948b8d835cc (patch)
tree619147321378a14274612c6dad3b13438ac94dd1 /configserver-client
parent9458ad2d810ff988ccc19cd8554bdbc78dabe588 (diff)
Revert "Merge pull request #17603 from vespa-engine/jonmv/revert-apache-config-server-client"
This reverts commit 9458ad2d810ff988ccc19cd8554bdbc78dabe588, reversing changes made to 6e94e192a07a540bff9b3e13ad05c8d0af093928.
Diffstat (limited to 'configserver-client')
-rw-r--r--configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java83
-rw-r--r--configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java141
-rw-r--r--configserver-client/src/main/java/ai/vespa/hosted/client/HttpConfigServerClient.java1
-rw-r--r--configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java8
4 files changed, 156 insertions, 77 deletions
diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java
index d467402f79f..2e5e445d63a 100644
--- a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java
+++ b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java
@@ -9,11 +9,12 @@ import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.ParseException;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.HttpEntities;
import org.apache.hc.core5.net.URIBuilder;
import java.io.IOException;
-import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -21,7 +22,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -104,7 +104,7 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
}
@Override
- public RequestBuilder send(HostStrategy hosts, Method method) {
+ public ConfigServerClient.RequestBuilder send(HostStrategy hosts, Method method) {
return new RequestBuilder(hosts, method);
}
@@ -114,10 +114,11 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
private final Method method;
private final HostStrategy hosts;
private final URIBuilder uriBuilder = new URIBuilder();
+ private final List<String> pathSegments = new ArrayList<>();
private HttpEntity entity;
private RequestConfig config = ConfigServerClient.defaultRequestConfig;
- private BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler = ConfigServerClient::throwOnError;
- private Consumer<IOException> catcher = ConfigServerClient::retryAll;
+ private ResponseVerifier verifier = ConfigServerClient.throwOnError;
+ private Consumer<IOException> catcher = ConfigServerClient.retryAll;
private RequestBuilder(HostStrategy hosts, Method method) {
if ( ! hosts.iterator().hasNext())
@@ -129,7 +130,7 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
@Override
public RequestBuilder at(List<String> pathSegments) {
- uriBuilder.setPathSegments(requireNonNull(pathSegments));
+ this.pathSegments.addAll(pathSegments);
return this;
}
@@ -145,12 +146,23 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
}
@Override
+ public ConfigServerClient.RequestBuilder emptyParameters(List<String> keys) {
+ for (String key : keys)
+ uriBuilder.setParameter(key, null);
+
+ return this;
+ }
+
+ @Override
public RequestBuilder parameters(List<String> pairs) {
if (pairs.size() % 2 != 0)
throw new IllegalArgumentException("Must supply parameter key/values in pairs");
- for (int i = 0; i < pairs.size(); )
- uriBuilder.setParameter(pairs.get(i++), pairs.get(i++));
+ for (int i = 0; i < pairs.size(); ) {
+ String key = pairs.get(i++), value = pairs.get(i++);
+ if (value != null)
+ uriBuilder.setParameter(key, value);
+ }
return this;
}
@@ -175,56 +187,54 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
}
@Override
- public RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler) {
- this.handler = requireNonNull(handler);
+ public RequestBuilder throwing(ResponseVerifier verifier) {
+ this.verifier = requireNonNull(verifier);
return this;
}
@Override
- public <T> T read(Function<byte[], T> mapper) {
- return mapIfSuccess(input -> {
- try (input) {
- return mapper.apply(input.readAllBytes());
+ public String read() {
+ return handle((response, __) -> {
+ try (response) {
+ return response.getEntity() == null ? "" : EntityUtils.toString(response.getEntity());
}
- catch (IOException e) {
- throw new RetryException(e);
+ catch (ParseException e) {
+ throw new IllegalStateException(e); // This isn't actually thrown by apache >_<
+ }
+ });
+ }
+
+ @Override
+ public <T> T read(Function<byte[], T> mapper) {
+ return handle((response, __) -> {
+ try (response) {
+ return mapper.apply(response.getEntity() == null ? new byte[0] : EntityUtils.toByteArray(response.getEntity()));
}
});
}
@Override
public void discard() throws UncheckedIOException, ResponseException {
- mapIfSuccess(input -> {
- try (input) {
+ handle((response, __) -> {
+ try (response) {
return null;
}
- catch (IOException e) {
- throw new RetryException(e);
- }
});
}
@Override
- public InputStream stream() throws UncheckedIOException, ResponseException {
- return mapIfSuccess(input -> input);
+ public HttpInputStream stream() throws UncheckedIOException, ResponseException {
+ return handle((response, __) -> new HttpInputStream(response));
}
- /** Returns the mapped body, if successful, retrying any IOException. The caller must close the body stream. */
- private <T> T mapIfSuccess(Function<InputStream, T> mapper) {
+ @Override
+ public <T> T handle(ResponseHandler<T> handler) {
+ uriBuilder.setPathSegments(pathSegments);
return execute(this,
(response, request) -> {
try {
- handler.accept(response, request); // This throws on unacceptable responses.
-
- InputStream body = response.getEntity() != null ? response.getEntity().getContent()
- : InputStream.nullInputStream();
- return mapper.apply(new ForwardingInputStream(body) {
- @Override
- public void close() throws IOException {
- super.close();
- response.close();
- }
- });
+ verifier.verify(response, request); // This throws on unacceptable responses.
+ return handler.handle(response, request);
}
catch (IOException | RuntimeException | Error e) {
try {
@@ -247,6 +257,7 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
}
+
@SuppressWarnings("unchecked")
private static <T extends Throwable> void sneakyThrow(Throwable t) throws T {
throw (T) t;
diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java
index c0f72378c1b..c92acd7cd0b 100644
--- a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java
+++ b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java
@@ -7,10 +7,10 @@ import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Method;
-import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.util.Timeout;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
@@ -20,18 +20,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.IntStream;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableList;
/**
* @author jonmv
*/
-public interface ConfigServerClient extends AutoCloseable {
+public interface ConfigServerClient extends Closeable {
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(Timeout.ofSeconds(5))
@@ -40,20 +40,12 @@ public interface ConfigServerClient extends AutoCloseable {
.build();
/** Wraps with a {@link RetryException} and rethrows. */
- static void retryAll(IOException e) {
+ Consumer<IOException> retryAll = (e) -> {
throw new RetryException(e);
- }
+ };
/** Throws a a {@link RetryException} if {@code statusCode == 503}, or a {@link ResponseException} unless {@code 200 <= statusCode < 300}. */
- static void throwOnError(ClassicHttpResponse response, ClassicHttpRequest request) {
- if (response.getCode() < HttpStatus.SC_OK || response.getCode() >= HttpStatus.SC_REDIRECTION) {
- ResponseException e = ResponseException.of(response, request);
- if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE)
- throw new RetryException(e);
-
- throw e;
- }
- }
+ ResponseVerifier throwOnError = new DefaultResponseVerifier() { };
/** Reads the response body, throwing an {@link UncheckedIOException} if this fails, or {@code null} if there is none. */
static byte[] getBytes(ClassicHttpResponse response) {
@@ -71,10 +63,10 @@ public interface ConfigServerClient extends AutoCloseable {
/** Builder for a request against a given set of hosts, using this config server client. */
interface RequestBuilder {
- /** Sets the request path. */
+ /** Appends to the request path. */
default RequestBuilder at(String... pathSegments) { return at(List.of(pathSegments)); }
- /** Sets the request path. */
+ /** Appends to the request path. */
RequestBuilder at(List<String> pathSegments);
/** Sets the request body as UTF-8 application/json. */
@@ -83,12 +75,20 @@ public interface ConfigServerClient extends AutoCloseable {
/** Sets the request body. */
RequestBuilder body(HttpEntity entity);
- /** Sets the parameter key/values for the request. Number of arguments must be even. */
+ /** Sets query parameters without a value, like {@code ?debug&recursive}. */
+ default RequestBuilder emptyParameters(String... keys) {
+ return emptyParameters(Arrays.asList(keys));
+ }
+
+ /** Sets query parameters without a value, like {@code ?debug&recursive}. */
+ RequestBuilder emptyParameters(List<String> keys);
+
+ /** Sets the parameter key/values for the request. Number of arguments must be even. Null values are omitted. */
default RequestBuilder parameters(String... pairs) {
return parameters(Arrays.asList(pairs));
}
- /** Sets the parameter key/values for the request. Number of arguments must be even. */
+ /** Sets the parameter key/values for the request. Number of arguments must be even. Null values are omitted. */
RequestBuilder parameters(List<String> pairs);
/** Overrides the default socket read timeout of the request. {@code Duration.ZERO} gives infinite timeout. */
@@ -109,7 +109,10 @@ public interface ConfigServerClient extends AutoCloseable {
* Sets the (error) response handler for this request. The default is {@link #throwOnError}.
* When the handler returns normally, the response is treated as a success, and passed on to a response mapper.
*/
- RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler);
+ RequestBuilder throwing(ResponseVerifier handler);
+
+ /** Reads the response as a {@link String}, or throws if unsuccessful. */
+ String read();
/** Reads and maps the response, or throws if unsuccessful. */
<T> T read(Function<byte[], T> mapper);
@@ -118,10 +121,88 @@ public interface ConfigServerClient extends AutoCloseable {
void discard();
/** Returns the raw response input stream, or throws if unsuccessful. The caller must close the returned stream. */
- InputStream stream();
+ HttpInputStream stream();
+
+ /** Uses the response and request, if successful, to generate a mapped response. */
+ <T> T handle(ResponseHandler<T> handler);
+
+ }
+
+
+ class HttpInputStream extends ForwardingInputStream {
+
+ private final ClassicHttpResponse response;
+
+ protected HttpInputStream(ClassicHttpResponse response) throws IOException {
+ super(response.getEntity() != null ? response.getEntity().getContent()
+ : InputStream.nullInputStream());
+ this.response = response;
+ }
+
+ public int statusCode() { return response.getCode(); }
+
+ public String contentType() { return response.getEntity().getContentType(); }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ response.close();
+ }
+
+ }
+
+
+ /** Reads a successful response and request to compute a result. */
+ @FunctionalInterface
+ interface ResponseHandler<T> {
+
+ /** Called with successful responses, as per {@link ResponseVerifier}. The caller must close the response. */
+ T handle(ClassicHttpResponse response, ClassicHttpRequest request) throws IOException;
}
+
+ /** Verifies a response, throwing on error responses, possibly indicating retries. */
+ @FunctionalInterface
+ interface ResponseVerifier {
+
+ /** Whether this status code means the response is an error response. */
+ default boolean isError(int statusCode) {
+ return statusCode < HttpStatus.SC_OK || HttpStatus.SC_REDIRECTION <= statusCode;
+ }
+
+ /** Whether this status code means we should retry. Has no effect if this is not also an error. */
+ default boolean shouldRetry(int statusCode) {
+ return statusCode == HttpStatus.SC_SERVICE_UNAVAILABLE;
+ }
+
+ /** Verifies the given response, consuming it and throwing if it is an error; or leaving it otherwise. */
+ default void verify(ClassicHttpResponse response, ClassicHttpRequest request) throws IOException {
+ if (isError(response.getCode())) {
+ try (response) {
+ byte[] body = response.getEntity() == null ? new byte[0] : EntityUtils.toByteArray(response.getEntity());
+ RuntimeException exception = toException(response.getCode(), body, request);
+ throw shouldRetry(response.getCode()) ? new RetryException(exception) : exception;
+ }
+ }
+ }
+
+ /** Throws the appropriate exception, for the given status code and body. */
+ RuntimeException toException(int statusCode, byte[] body, ClassicHttpRequest request);
+
+ }
+
+
+ interface DefaultResponseVerifier extends ResponseVerifier {
+
+ @Override
+ default RuntimeException toException(int statusCode, byte[] body, ClassicHttpRequest request) {
+ return new ResponseException(request + " failed with status " + statusCode + " and body '" + new String(body, UTF_8) + "'");
+ }
+
+ }
+
+
/** What host(s) to try for a request, in what order. A host may be specified multiple times, for retries. */
@FunctionalInterface
interface HostStrategy extends Iterable<URI> {
@@ -147,6 +228,7 @@ public interface ConfigServerClient extends AutoCloseable {
}
+
/** Exception wrapper that signals retries should be attempted. */
final class RetryException extends RuntimeException {
@@ -160,25 +242,12 @@ public interface ConfigServerClient extends AutoCloseable {
}
+
/** An exception due to server error, a bad request, or similar, which resulted in a non-OK HTTP response. */
class ResponseException extends RuntimeException {
- public ResponseException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public static ResponseException of(ClassicHttpResponse response, ClassicHttpRequest request) {
- String detail;
- Throwable thrown = null;
- try {
- detail = request.getEntity() == null ? " and no body"
- : " and body '" + EntityUtils.toString(request.getEntity()) + "'";
- }
- catch (IOException | ParseException e) {
- detail = ". Reading body failed";
- thrown = e;
- }
- return new ResponseException(request + " failed with status " + response.getCode() + detail, thrown);
+ public ResponseException(String message) {
+ super(message);
}
}
diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/HttpConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/HttpConfigServerClient.java
index c5b07eceaf5..e00489f0c64 100644
--- a/configserver-client/src/main/java/ai/vespa/hosted/client/HttpConfigServerClient.java
+++ b/configserver-client/src/main/java/ai/vespa/hosted/client/HttpConfigServerClient.java
@@ -54,7 +54,6 @@ public class HttpConfigServerClient extends AbstractConfigServerClient {
false)
.disableAutomaticRetries()
.setUserAgent(userAgent)
- .setDefaultRequestConfig(defaultRequestConfig)
.build();
}
diff --git a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
index 3b46501a4bf..b2f17f43f5c 100644
--- a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
+++ b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
@@ -70,14 +70,15 @@ class HttpConfigServerClientTest {
server.resetRequests();
// Successful attempt returns.
- server.stubFor(get("/root/boot"))
+ server.stubFor(get("/root/boot/toot"))
.setResponse(okJson("{}").build());
assertEquals("{}",
client.send(HostStrategy.repeating(URI.create("http://localhost:" + server.port()), 10),
Method.GET)
.at("root", "boot")
+ .at("toot")
.read(String::new));
- server.verify(1, getRequestedFor(urlEqualTo("/root/boot")));
+ server.verify(1, getRequestedFor(urlEqualTo("/root/boot/toot")));
server.verify(1, anyRequestedFor(anyUrl()));
server.resetRequests();
@@ -88,10 +89,9 @@ class HttpConfigServerClientTest {
() -> client.send(HostStrategy.repeating(URI.create("http://localhost:" + server.port()), 10),
Method.GET)
.read(String::new));
- assertEquals("GET / failed with status 409 and no body", thrown.getMessage());
+ assertEquals("GET / failed with status 409 and body 'hi'", thrown.getMessage());
server.verify(1, getRequestedFor(urlEqualTo("/")));
server.verify(1, anyRequestedFor(anyUrl()));
-
}
}