diff options
Diffstat (limited to 'configserver-client/src/main/java/ai/vespa')
-rw-r--r-- | configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java | 63 | ||||
-rw-r--r-- | configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java | 125 |
2 files changed, 124 insertions, 64 deletions
diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java index ceaf962d36b..626b08d17ee 100644 --- a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java +++ b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java @@ -9,11 +9,12 @@ import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.HttpEntities; import org.apache.hc.core5.net.URIBuilder; import java.io.IOException; -import java.io.InputStream; import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; @@ -21,7 +22,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -117,8 +117,8 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { private final List<String> pathSegments = new ArrayList<>(); private HttpEntity entity; private RequestConfig config = ConfigServerClient.defaultRequestConfig; - private BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler = ConfigServerClient::throwOnError; - private Consumer<IOException> catcher = ConfigServerClient::retryAll; + private ResponseVerifier verifier = ConfigServerClient.throwOnError; + private Consumer<IOException> catcher = ConfigServerClient.retryAll; private RequestBuilder(HostStrategy hosts, Method method) { if ( ! hosts.iterator().hasNext()) @@ -176,56 +176,54 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { } @Override - public RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler) { - this.handler = requireNonNull(handler); + public RequestBuilder throwing(ResponseVerifier verifier) { + this.verifier = requireNonNull(verifier); return this; } @Override - public <T> T read(Function<byte[], T> mapper) { - return mapIfSuccess(input -> { - try (input) { - return mapper.apply(input.readAllBytes()); + public String read() { + return handle((response, __) -> { + try (response) { + return response.getEntity() == null ? "" : EntityUtils.toString(response.getEntity()); } - catch (IOException e) { - throw new RetryException(e); + catch (ParseException e) { + throw new IllegalStateException(e); // This isn't actually thrown by apache >_< + } + }); + } + + @Override + public <T> T read(Function<byte[], T> mapper) { + return handle((response, __) -> { + try (response) { + return mapper.apply(response.getEntity() == null ? new byte[0] : EntityUtils.toByteArray(response.getEntity())); } }); } @Override public void discard() throws UncheckedIOException, ResponseException { - mapIfSuccess(input -> { - try (input) { + handle((response, __) -> { + try (response) { return null; } - catch (IOException e) { - throw new RetryException(e); - } }); } @Override - public InputStream stream() throws UncheckedIOException, ResponseException { - return mapIfSuccess(input -> input); + public HttpInputStream stream() throws UncheckedIOException, ResponseException { + return handle((response, __) -> new HttpInputStream(response)); } - /** Returns the mapped body, if successful, retrying any IOException. The caller must close the body stream. */ - private <T> T mapIfSuccess(Function<InputStream, T> mapper) { + @Override + public <T> T handle(ResponseHandler<T> handler) { + uriBuilder.setPathSegments(pathSegments); return execute(this, (response, request) -> { try { - handler.accept(response, request); // This throws on unacceptable responses. - - InputStream body = response.getEntity() != null ? response.getEntity().getContent() - : InputStream.nullInputStream(); - return mapper.apply(new ForwardingInputStream(body) { - @Override - public void close() throws IOException { - super.close(); - response.close(); - } - }); + verifier.verify(response, request); // This throws on unacceptable responses. + return handler.handle(response, request); } catch (IOException | RuntimeException | Error e) { try { @@ -248,6 +246,7 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { } + @SuppressWarnings("unchecked") private static <T extends Throwable> void sneakyThrow(Throwable t) throws T { throw (T) t; diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java index 61d5e154141..f8cb22c65b0 100644 --- a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java +++ b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java @@ -4,10 +4,10 @@ package ai.vespa.hosted.client; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.Method; -import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.util.Timeout; @@ -42,20 +42,12 @@ public interface ConfigServerClient extends Closeable { .build(); /** Wraps with a {@link RetryException} and rethrows. */ - static void retryAll(IOException e) { + Consumer<IOException> retryAll = (e) -> { throw new RetryException(e); - } + }; /** Throws a a {@link RetryException} if {@code statusCode == 503}, or a {@link ResponseException} unless {@code 200 <= statusCode < 300}. */ - static void throwOnError(ClassicHttpResponse response, ClassicHttpRequest request) { - if (response.getCode() < HttpStatus.SC_OK || response.getCode() >= HttpStatus.SC_REDIRECTION) { - ResponseException e = ResponseException.of(response, request); - if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) - throw new RetryException(e); - - throw e; - } - } + ResponseVerifier throwOnError = new DefaultResponseVerifier() { }; /** Reads the response body, throwing an {@link UncheckedIOException} if this fails, or {@code null} if there is none. */ static byte[] getBytes(ClassicHttpResponse response) { @@ -73,10 +65,10 @@ public interface ConfigServerClient extends Closeable { /** Builder for a request against a given set of hosts, using this config server client. */ interface RequestBuilder { - /** Sets the request path. */ + /** Appends to the request path. */ default RequestBuilder at(String... pathSegments) { return at(List.of(pathSegments)); } - /** Sets the request path. */ + /** Appends to the request path. */ RequestBuilder at(List<String> pathSegments); /** Sets the request body as UTF-8 application/json. */ @@ -111,7 +103,10 @@ public interface ConfigServerClient extends Closeable { * Sets the (error) response handler for this request. The default is {@link #throwOnError}. * When the handler returns normally, the response is treated as a success, and passed on to a response mapper. */ - RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler); + RequestBuilder throwing(ResponseVerifier handler); + + /** Reads the response as a {@link String}, or throws if unsuccessful. */ + String read(); /** Reads and maps the response, or throws if unsuccessful. */ <T> T read(Function<byte[], T> mapper); @@ -120,10 +115,88 @@ public interface ConfigServerClient extends Closeable { void discard(); /** Returns the raw response input stream, or throws if unsuccessful. The caller must close the returned stream. */ - InputStream stream(); + HttpInputStream stream(); + + /** Uses the response and request, if successful, to generate a mapped response. */ + <T> T handle(ResponseHandler<T> handler); } + + class HttpInputStream extends ForwardingInputStream { + + private final ClassicHttpResponse response; + + protected HttpInputStream(ClassicHttpResponse response) throws IOException { + super(response.getEntity() != null ? response.getEntity().getContent() + : InputStream.nullInputStream()); + this.response = response; + } + + public int statusCode() { return response.getCode(); } + + public String contentType() { return response.getEntity().getContentType(); } + + @Override + public void close() throws IOException { + super.close(); + response.close(); + } + + } + + + /** Reads a successful response and request to compute a result. */ + @FunctionalInterface + interface ResponseHandler<T> { + + /** Called with successful responses, as per {@link ResponseVerifier}. The caller must close the response. */ + T handle(ClassicHttpResponse response, ClassicHttpRequest request) throws IOException; + + } + + + /** Verifies a response, throwing on error responses, possibly indicating retries. */ + @FunctionalInterface + interface ResponseVerifier { + + /** Whether this status code means the response is an error response. */ + default boolean isError(int statusCode) { + return statusCode < HttpStatus.SC_OK || HttpStatus.SC_REDIRECTION <= statusCode; + } + + /** Whether this status code means we should retry. Has no effect if this is not also an error. */ + default boolean shouldRetry(int statusCode) { + return statusCode == HttpStatus.SC_SERVICE_UNAVAILABLE; + } + + /** Verifies the given response, consuming it and throwing if it is an error; or leaving it otherwise. */ + default void verify(ClassicHttpResponse response, ClassicHttpRequest request) throws IOException { + if (isError(response.getCode())) { + try (response) { + byte[] body = response.getEntity() == null ? new byte[0] : EntityUtils.toByteArray(response.getEntity()); + RuntimeException exception = toException(response.getCode(), body, request); + throw shouldRetry(response.getCode()) ? new RetryException(exception) : exception; + } + } + } + + /** Throws the appropriate exception, for the given status code and body. */ + RuntimeException toException(int statusCode, byte[] body, ClassicHttpRequest request); + + } + + + interface DefaultResponseVerifier extends ResponseVerifier { + + @Override + default RuntimeException toException(int statusCode, byte[] body, ClassicHttpRequest request) { + return new ResponseException(request + " failed with status " + statusCode + " and body '" + new String(body, UTF_8) + "'"); + } + + } + + /** What host(s) to try for a request, in what order. A host may be specified multiple times, for retries. */ @FunctionalInterface interface HostStrategy extends Iterable<URI> { @@ -149,6 +222,7 @@ public interface ConfigServerClient extends Closeable { } + /** Exception wrapper that signals retries should be attempted. */ final class RetryException extends RuntimeException { @@ -162,25 +236,12 @@ public interface ConfigServerClient extends Closeable { } + /** An exception due to server error, a bad request, or similar, which resulted in a non-OK HTTP response. */ class ResponseException extends RuntimeException { - public ResponseException(String message, Throwable cause) { - super(message, cause); - } - - public static ResponseException of(ClassicHttpResponse response, ClassicHttpRequest request) { - String detail; - Throwable thrown = null; - try { - detail = request.getEntity() == null ? " and no body" - : " and body '" + EntityUtils.toString(request.getEntity()) + "'"; - } - catch (IOException | ParseException e) { - detail = ". Reading body failed"; - thrown = e; - } - return new ResponseException(request + " failed with status " + response.getCode() + detail, thrown); + public ResponseException(String message) { + super(message); } } |