diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-04-22 15:54:04 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-04-22 15:54:04 +0200 |
commit | 081ac334cad95878702e2272ac0b2f9136e59535 (patch) | |
tree | 687970bb31069a72ad03f4a0db9c1b7872b9eb3b /configserver-client/src | |
parent | abcb7b59536d92b388af5e0dbafd050ad4e5f91d (diff) |
Take two
Diffstat (limited to 'configserver-client/src')
3 files changed, 142 insertions, 91 deletions
diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java index 0162b3b7a0f..d467402f79f 100644 --- a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java +++ b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java @@ -8,11 +8,9 @@ import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpEntity; -import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.Method; import org.apache.hc.core5.http.io.entity.HttpEntities; import org.apache.hc.core5.net.URIBuilder; -import org.apache.hc.core5.util.Timeout; import java.io.IOException; import java.io.InputStream; @@ -23,10 +21,12 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Logger; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; @@ -36,19 +36,15 @@ import static java.util.logging.Level.WARNING; */ public abstract class AbstractConfigServerClient implements ConfigServerClient { - static final RequestConfig defaultRequestConfig = RequestConfig.custom() - .setConnectionRequestTimeout(Timeout.ofSeconds(5)) - .setConnectTimeout(Timeout.ofSeconds(5)) - .setRedirectsEnabled(false) - .build(); - private static final Logger log = Logger.getLogger(AbstractConfigServerClient.class.getName()); /** Executes the request with the given context. The caller must close the response. */ - protected abstract ClassicHttpResponse execute(ClassicHttpRequest request, HttpClientContext context) throws IOException; + abstract ClassicHttpResponse execute(ClassicHttpRequest request, HttpClientContext context) throws IOException; /** Executes the given request with response/error handling and retries. */ - private <T> T execute(RequestBuilder builder, Function<ClassicHttpResponse, T> handler, Function<IOException, T> catcher) { + private <T> T execute(RequestBuilder builder, + BiFunction<ClassicHttpResponse, ClassicHttpRequest, T> handler, + Consumer<IOException> catcher) { HttpClientContext context = HttpClientContext.create(); context.setRequestConfig(builder.config); @@ -58,10 +54,11 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { request.setEntity(builder.entity); try { try { - return handler.apply(execute(request, context)); + return handler.apply(execute(request, context), request); } catch (IOException e) { - return catcher.apply(e); + catcher.accept(e); + throw new UncheckedIOException(e); // Throw unchecked if catcher doesn't throw. } } catch (RetryException e) { @@ -86,7 +83,7 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { throw new IllegalStateException("Illegal retry cause: " + thrown.getClass(), thrown); } - throw new IllegalArgumentException("No hosts to perform the request against"); + throw new IllegalStateException("No hosts to perform the request against"); } /** Append path to the given host, which may already contain a root path. */ @@ -98,8 +95,8 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { pathSegments.addAll(pathAndQuery.getPathSegments()); try { return builder.setPathSegments(pathSegments) - .setParameters(pathAndQuery.getQueryParams()) - .build(); + .setParameters(pathAndQuery.getQueryParams()) + .build(); } catch (URISyntaxException e) { throw new IllegalArgumentException("URISyntaxException should not be possible here", e); @@ -118,7 +115,9 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { private final HostStrategy hosts; private final URIBuilder uriBuilder = new URIBuilder(); private HttpEntity entity; - private RequestConfig config = defaultRequestConfig; + private RequestConfig config = ConfigServerClient.defaultRequestConfig; + private BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler = ConfigServerClient::throwOnError; + private Consumer<IOException> catcher = ConfigServerClient::retryAll; private RequestBuilder(HostStrategy hosts, Method method) { if ( ! hosts.iterator().hasNext()) @@ -166,17 +165,23 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { @Override public RequestBuilder config(RequestConfig config) { this.config = requireNonNull(config); + return this; + } + @Override + public RequestBuilder catching(Consumer<IOException> catcher) { + this.catcher = requireNonNull(catcher); return this; } @Override - public <T> T handle(Function<ClassicHttpResponse, T> handler, Function<IOException, T> catcher) throws UncheckedIOException { - return execute(this, requireNonNull(handler), requireNonNull(catcher)); + public RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler) { + this.handler = requireNonNull(handler); + return this; } @Override - public <T> T read(Function<byte[], T> mapper) throws UncheckedIOException, ResponseException { + public <T> T read(Function<byte[], T> mapper) { return mapIfSuccess(input -> { try (input) { return mapper.apply(input.readAllBytes()); @@ -206,38 +211,38 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient { /** Returns the mapped body, if successful, retrying any IOException. The caller must close the body stream. */ private <T> T mapIfSuccess(Function<InputStream, T> mapper) { - return handle(response -> { - try { - InputStream body = response.getEntity() != null ? response.getEntity().getContent() - : InputStream.nullInputStream(); - if (response.getCode() >= HttpStatus.SC_REDIRECTION) - throw new ResponseException(response.getCode(), new String(body.readAllBytes(), UTF_8), ""); - - return mapper.apply(new ForwardingInputStream(body) { - @Override - public void close() throws IOException { - super.close(); - response.close(); - } - }); - } - catch (IOException | RuntimeException | Error e) { - try { - response.close(); - } - catch (IOException f) { - e.addSuppressed(f); - } - if (e instanceof IOException) - throw new RetryException((IOException) e); - else - sneakyThrow(e); // e is a runtime exception or an error, so this is fine. - throw new AssertionError("Should not happen"); - } - }, - ioException -> { - throw new RetryException(ioException); - }); + return execute(this, + (response, request) -> { + try { + handler.accept(response, request); // This throws on unacceptable responses. + + InputStream body = response.getEntity() != null ? response.getEntity().getContent() + : InputStream.nullInputStream(); + return mapper.apply(new ForwardingInputStream(body) { + @Override + public void close() throws IOException { + super.close(); + response.close(); + } + }); + } + catch (IOException | RuntimeException | Error e) { + try { + response.close(); + } + catch (IOException f) { + e.addSuppressed(f); + } + if (e instanceof IOException) { + catcher.accept((IOException) e); + throw new UncheckedIOException((IOException) e); + } + else + sneakyThrow(e); // e is a runtime exception or an error, so this is fine. + throw new AssertionError("Should not happen"); + } + }, + catcher); } } diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java index 3b24f1a568e..c0f72378c1b 100644 --- a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java +++ b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java @@ -2,9 +2,14 @@ package ai.vespa.hosted.client; import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.util.Timeout; import java.io.IOException; import java.io.InputStream; @@ -15,6 +20,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.IntStream; @@ -26,6 +33,38 @@ import static java.util.stream.Collectors.toUnmodifiableList; */ public interface ConfigServerClient extends AutoCloseable { + RequestConfig defaultRequestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(Timeout.ofSeconds(5)) + .setConnectTimeout(Timeout.ofSeconds(5)) + .setRedirectsEnabled(false) + .build(); + + /** Wraps with a {@link RetryException} and rethrows. */ + static void retryAll(IOException e) { + throw new RetryException(e); + } + + /** Throws a a {@link RetryException} if {@code statusCode == 503}, or a {@link ResponseException} unless {@code 200 <= statusCode < 300}. */ + static void throwOnError(ClassicHttpResponse response, ClassicHttpRequest request) { + if (response.getCode() < HttpStatus.SC_OK || response.getCode() >= HttpStatus.SC_REDIRECTION) { + ResponseException e = ResponseException.of(response, request); + if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) + throw new RetryException(e); + + throw e; + } + } + + /** Reads the response body, throwing an {@link UncheckedIOException} if this fails, or {@code null} if there is none. */ + static byte[] getBytes(ClassicHttpResponse response) { + try { + return response.getEntity() == null ? null : EntityUtils.toByteArray(response.getEntity()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + /** Creates a builder for sending the given method, using the specified host strategy. */ RequestBuilder send(HostStrategy hosts, Method method); @@ -59,37 +98,27 @@ public interface ConfigServerClient extends AutoCloseable { RequestBuilder config(RequestConfig config); /** - * Sets custom retry/failure logic for this. - * <p> - * Exactly one of the callbacks will be invoked, with a non-null argument. - * Return a value to have that returned to the caller; - * throw a {@link RetryException} to have the request retried; or - * throw any other unchecked exception to have this propagate out to the caller. - * The caller must close the provided response, if any. + * Sets the catch clause for {@link IOException}s during execution of this. + * The default is to wrap the IOException in a {@link RetryException} and rethrow this; + * this makes the client retry the request, as long as there are remaining entries in the {@link HostStrategy}. + * If the catcher returns normally, the {@link IOException} is unchecked and thrown instead. */ - <T> T handle(Function<ClassicHttpResponse, T> handler, Function<IOException, T> catcher) throws UncheckedIOException; + RequestBuilder catching(Consumer<IOException> catcher); - /** Sets the response body mapper for this, for successful requests. */ - <T> T read(Function<byte[], T> mapper) throws UncheckedIOException, ResponseException; - - /** Discards the response, but throws if the response is unsuccessful. */ - void discard() throws UncheckedIOException, ResponseException; - - /** Returns the raw input stream of the response, if successful. The caller must close the returned stream. */ - InputStream stream() throws UncheckedIOException, ResponseException; + /** + * Sets the (error) response handler for this request. The default is {@link #throwOnError}. + * When the handler returns normally, the response is treated as a success, and passed on to a response mapper. + */ + RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler); - } + /** Reads and maps the response, or throws if unsuccessful. */ + <T> T read(Function<byte[], T> mapper); - /** Exception wrapper that signals retries should be attempted. */ - final class RetryException extends RuntimeException { + /** Discards the response, but throws if unsuccessful. */ + void discard(); - public RetryException(IOException cause) { - super(requireNonNull(cause)); - } - - public RetryException(RuntimeException cause) { - super(requireNonNull(cause)); - } + /** Returns the raw response input stream, or throws if unsuccessful. The caller must close the returned stream. */ + InputStream stream(); } @@ -118,21 +147,39 @@ public interface ConfigServerClient extends AutoCloseable { } - /** An exception due to server error, a bad request, or similar, which resulted in a non-OK HTTP response. */ - class ResponseException extends RuntimeException { + /** Exception wrapper that signals retries should be attempted. */ + final class RetryException extends RuntimeException { - private final int code; - private final String body; + public RetryException(IOException cause) { + super(requireNonNull(cause)); + } - public ResponseException(int code, String body, String context) { - super(context + ": " + body); - this.code = code; - this.body = body; + public RetryException(RuntimeException cause) { + super(requireNonNull(cause)); } - public int code() { return code; } + } + + /** An exception due to server error, a bad request, or similar, which resulted in a non-OK HTTP response. */ + class ResponseException extends RuntimeException { - public String body() { return body; } + public ResponseException(String message, Throwable cause) { + super(message, cause); + } + + public static ResponseException of(ClassicHttpResponse response, ClassicHttpRequest request) { + String detail; + Throwable thrown = null; + try { + detail = request.getEntity() == null ? " and no body" + : " and body '" + EntityUtils.toString(request.getEntity()) + "'"; + } + catch (IOException | ParseException e) { + detail = ". Reading body failed"; + thrown = e; + } + return new ResponseException(request + " failed with status " + response.getCode() + detail, thrown); + } } diff --git a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java index dffc94e94c0..3b46501a4bf 100644 --- a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java +++ b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java @@ -35,7 +35,7 @@ class HttpConfigServerClientTest { @RegisterExtension final WireMockExtension server = new WireMockExtension(); - final HttpConfigServerClient client = new HttpConfigServerClient(List.of(new AthenzService("mydomain", "yourservice")), "user"); + final ConfigServerClient client = new HttpConfigServerClient(List.of(new AthenzService("mydomain", "yourservice")), "user"); @Test void testRetries() { @@ -88,8 +88,7 @@ class HttpConfigServerClientTest { () -> client.send(HostStrategy.repeating(URI.create("http://localhost:" + server.port()), 10), Method.GET) .read(String::new)); - assertEquals(409, thrown.code()); - assertEquals("hi", thrown.body()); + assertEquals("GET / failed with status 409 and no body", thrown.getMessage()); server.verify(1, getRequestedFor(urlEqualTo("/"))); server.verify(1, anyRequestedFor(anyUrl())); |