aboutsummaryrefslogtreecommitdiffstats
path: root/configserver-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-04-22 15:54:04 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-04-22 15:54:04 +0200
commit081ac334cad95878702e2272ac0b2f9136e59535 (patch)
tree687970bb31069a72ad03f4a0db9c1b7872b9eb3b /configserver-client
parentabcb7b59536d92b388af5e0dbafd050ad4e5f91d (diff)
Take two
Diffstat (limited to 'configserver-client')
-rw-r--r--configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java109
-rw-r--r--configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java119
-rw-r--r--configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java5
3 files changed, 142 insertions, 91 deletions
diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java
index 0162b3b7a0f..d467402f79f 100644
--- a/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java
+++ b/configserver-client/src/main/java/ai/vespa/hosted/client/AbstractConfigServerClient.java
@@ -8,11 +8,9 @@ import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
-import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.io.entity.HttpEntities;
import org.apache.hc.core5.net.URIBuilder;
-import org.apache.hc.core5.util.Timeout;
import java.io.IOException;
import java.io.InputStream;
@@ -23,10 +21,12 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Logger;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
@@ -36,19 +36,15 @@ import static java.util.logging.Level.WARNING;
*/
public abstract class AbstractConfigServerClient implements ConfigServerClient {
- static final RequestConfig defaultRequestConfig = RequestConfig.custom()
- .setConnectionRequestTimeout(Timeout.ofSeconds(5))
- .setConnectTimeout(Timeout.ofSeconds(5))
- .setRedirectsEnabled(false)
- .build();
-
private static final Logger log = Logger.getLogger(AbstractConfigServerClient.class.getName());
/** Executes the request with the given context. The caller must close the response. */
- protected abstract ClassicHttpResponse execute(ClassicHttpRequest request, HttpClientContext context) throws IOException;
+ abstract ClassicHttpResponse execute(ClassicHttpRequest request, HttpClientContext context) throws IOException;
/** Executes the given request with response/error handling and retries. */
- private <T> T execute(RequestBuilder builder, Function<ClassicHttpResponse, T> handler, Function<IOException, T> catcher) {
+ private <T> T execute(RequestBuilder builder,
+ BiFunction<ClassicHttpResponse, ClassicHttpRequest, T> handler,
+ Consumer<IOException> catcher) {
HttpClientContext context = HttpClientContext.create();
context.setRequestConfig(builder.config);
@@ -58,10 +54,11 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
request.setEntity(builder.entity);
try {
try {
- return handler.apply(execute(request, context));
+ return handler.apply(execute(request, context), request);
}
catch (IOException e) {
- return catcher.apply(e);
+ catcher.accept(e);
+ throw new UncheckedIOException(e); // Throw unchecked if catcher doesn't throw.
}
}
catch (RetryException e) {
@@ -86,7 +83,7 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
throw new IllegalStateException("Illegal retry cause: " + thrown.getClass(), thrown);
}
- throw new IllegalArgumentException("No hosts to perform the request against");
+ throw new IllegalStateException("No hosts to perform the request against");
}
/** Append path to the given host, which may already contain a root path. */
@@ -98,8 +95,8 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
pathSegments.addAll(pathAndQuery.getPathSegments());
try {
return builder.setPathSegments(pathSegments)
- .setParameters(pathAndQuery.getQueryParams())
- .build();
+ .setParameters(pathAndQuery.getQueryParams())
+ .build();
}
catch (URISyntaxException e) {
throw new IllegalArgumentException("URISyntaxException should not be possible here", e);
@@ -118,7 +115,9 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
private final HostStrategy hosts;
private final URIBuilder uriBuilder = new URIBuilder();
private HttpEntity entity;
- private RequestConfig config = defaultRequestConfig;
+ private RequestConfig config = ConfigServerClient.defaultRequestConfig;
+ private BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler = ConfigServerClient::throwOnError;
+ private Consumer<IOException> catcher = ConfigServerClient::retryAll;
private RequestBuilder(HostStrategy hosts, Method method) {
if ( ! hosts.iterator().hasNext())
@@ -166,17 +165,23 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
@Override
public RequestBuilder config(RequestConfig config) {
this.config = requireNonNull(config);
+ return this;
+ }
+ @Override
+ public RequestBuilder catching(Consumer<IOException> catcher) {
+ this.catcher = requireNonNull(catcher);
return this;
}
@Override
- public <T> T handle(Function<ClassicHttpResponse, T> handler, Function<IOException, T> catcher) throws UncheckedIOException {
- return execute(this, requireNonNull(handler), requireNonNull(catcher));
+ public RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler) {
+ this.handler = requireNonNull(handler);
+ return this;
}
@Override
- public <T> T read(Function<byte[], T> mapper) throws UncheckedIOException, ResponseException {
+ public <T> T read(Function<byte[], T> mapper) {
return mapIfSuccess(input -> {
try (input) {
return mapper.apply(input.readAllBytes());
@@ -206,38 +211,38 @@ public abstract class AbstractConfigServerClient implements ConfigServerClient {
/** Returns the mapped body, if successful, retrying any IOException. The caller must close the body stream. */
private <T> T mapIfSuccess(Function<InputStream, T> mapper) {
- return handle(response -> {
- try {
- InputStream body = response.getEntity() != null ? response.getEntity().getContent()
- : InputStream.nullInputStream();
- if (response.getCode() >= HttpStatus.SC_REDIRECTION)
- throw new ResponseException(response.getCode(), new String(body.readAllBytes(), UTF_8), "");
-
- return mapper.apply(new ForwardingInputStream(body) {
- @Override
- public void close() throws IOException {
- super.close();
- response.close();
- }
- });
- }
- catch (IOException | RuntimeException | Error e) {
- try {
- response.close();
- }
- catch (IOException f) {
- e.addSuppressed(f);
- }
- if (e instanceof IOException)
- throw new RetryException((IOException) e);
- else
- sneakyThrow(e); // e is a runtime exception or an error, so this is fine.
- throw new AssertionError("Should not happen");
- }
- },
- ioException -> {
- throw new RetryException(ioException);
- });
+ return execute(this,
+ (response, request) -> {
+ try {
+ handler.accept(response, request); // This throws on unacceptable responses.
+
+ InputStream body = response.getEntity() != null ? response.getEntity().getContent()
+ : InputStream.nullInputStream();
+ return mapper.apply(new ForwardingInputStream(body) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ response.close();
+ }
+ });
+ }
+ catch (IOException | RuntimeException | Error e) {
+ try {
+ response.close();
+ }
+ catch (IOException f) {
+ e.addSuppressed(f);
+ }
+ if (e instanceof IOException) {
+ catcher.accept((IOException) e);
+ throw new UncheckedIOException((IOException) e);
+ }
+ else
+ sneakyThrow(e); // e is a runtime exception or an error, so this is fine.
+ throw new AssertionError("Should not happen");
+ }
+ },
+ catcher);
}
}
diff --git a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java
index 3b24f1a568e..c0f72378c1b 100644
--- a/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java
+++ b/configserver-client/src/main/java/ai/vespa/hosted/client/ConfigServerClient.java
@@ -2,9 +2,14 @@
package ai.vespa.hosted.client;
import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.ParseException;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.hc.core5.util.Timeout;
import java.io.IOException;
import java.io.InputStream;
@@ -15,6 +20,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.IntStream;
@@ -26,6 +33,38 @@ import static java.util.stream.Collectors.toUnmodifiableList;
*/
public interface ConfigServerClient extends AutoCloseable {
+ RequestConfig defaultRequestConfig = RequestConfig.custom()
+ .setConnectionRequestTimeout(Timeout.ofSeconds(5))
+ .setConnectTimeout(Timeout.ofSeconds(5))
+ .setRedirectsEnabled(false)
+ .build();
+
+ /** Wraps with a {@link RetryException} and rethrows. */
+ static void retryAll(IOException e) {
+ throw new RetryException(e);
+ }
+
+ /** Throws a a {@link RetryException} if {@code statusCode == 503}, or a {@link ResponseException} unless {@code 200 <= statusCode < 300}. */
+ static void throwOnError(ClassicHttpResponse response, ClassicHttpRequest request) {
+ if (response.getCode() < HttpStatus.SC_OK || response.getCode() >= HttpStatus.SC_REDIRECTION) {
+ ResponseException e = ResponseException.of(response, request);
+ if (response.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE)
+ throw new RetryException(e);
+
+ throw e;
+ }
+ }
+
+ /** Reads the response body, throwing an {@link UncheckedIOException} if this fails, or {@code null} if there is none. */
+ static byte[] getBytes(ClassicHttpResponse response) {
+ try {
+ return response.getEntity() == null ? null : EntityUtils.toByteArray(response.getEntity());
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
/** Creates a builder for sending the given method, using the specified host strategy. */
RequestBuilder send(HostStrategy hosts, Method method);
@@ -59,37 +98,27 @@ public interface ConfigServerClient extends AutoCloseable {
RequestBuilder config(RequestConfig config);
/**
- * Sets custom retry/failure logic for this.
- * <p>
- * Exactly one of the callbacks will be invoked, with a non-null argument.
- * Return a value to have that returned to the caller;
- * throw a {@link RetryException} to have the request retried; or
- * throw any other unchecked exception to have this propagate out to the caller.
- * The caller must close the provided response, if any.
+ * Sets the catch clause for {@link IOException}s during execution of this.
+ * The default is to wrap the IOException in a {@link RetryException} and rethrow this;
+ * this makes the client retry the request, as long as there are remaining entries in the {@link HostStrategy}.
+ * If the catcher returns normally, the {@link IOException} is unchecked and thrown instead.
*/
- <T> T handle(Function<ClassicHttpResponse, T> handler, Function<IOException, T> catcher) throws UncheckedIOException;
+ RequestBuilder catching(Consumer<IOException> catcher);
- /** Sets the response body mapper for this, for successful requests. */
- <T> T read(Function<byte[], T> mapper) throws UncheckedIOException, ResponseException;
-
- /** Discards the response, but throws if the response is unsuccessful. */
- void discard() throws UncheckedIOException, ResponseException;
-
- /** Returns the raw input stream of the response, if successful. The caller must close the returned stream. */
- InputStream stream() throws UncheckedIOException, ResponseException;
+ /**
+ * Sets the (error) response handler for this request. The default is {@link #throwOnError}.
+ * When the handler returns normally, the response is treated as a success, and passed on to a response mapper.
+ */
+ RequestBuilder handling(BiConsumer<ClassicHttpResponse, ClassicHttpRequest> handler);
- }
+ /** Reads and maps the response, or throws if unsuccessful. */
+ <T> T read(Function<byte[], T> mapper);
- /** Exception wrapper that signals retries should be attempted. */
- final class RetryException extends RuntimeException {
+ /** Discards the response, but throws if unsuccessful. */
+ void discard();
- public RetryException(IOException cause) {
- super(requireNonNull(cause));
- }
-
- public RetryException(RuntimeException cause) {
- super(requireNonNull(cause));
- }
+ /** Returns the raw response input stream, or throws if unsuccessful. The caller must close the returned stream. */
+ InputStream stream();
}
@@ -118,21 +147,39 @@ public interface ConfigServerClient extends AutoCloseable {
}
- /** An exception due to server error, a bad request, or similar, which resulted in a non-OK HTTP response. */
- class ResponseException extends RuntimeException {
+ /** Exception wrapper that signals retries should be attempted. */
+ final class RetryException extends RuntimeException {
- private final int code;
- private final String body;
+ public RetryException(IOException cause) {
+ super(requireNonNull(cause));
+ }
- public ResponseException(int code, String body, String context) {
- super(context + ": " + body);
- this.code = code;
- this.body = body;
+ public RetryException(RuntimeException cause) {
+ super(requireNonNull(cause));
}
- public int code() { return code; }
+ }
+
+ /** An exception due to server error, a bad request, or similar, which resulted in a non-OK HTTP response. */
+ class ResponseException extends RuntimeException {
- public String body() { return body; }
+ public ResponseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public static ResponseException of(ClassicHttpResponse response, ClassicHttpRequest request) {
+ String detail;
+ Throwable thrown = null;
+ try {
+ detail = request.getEntity() == null ? " and no body"
+ : " and body '" + EntityUtils.toString(request.getEntity()) + "'";
+ }
+ catch (IOException | ParseException e) {
+ detail = ". Reading body failed";
+ thrown = e;
+ }
+ return new ResponseException(request + " failed with status " + response.getCode() + detail, thrown);
+ }
}
diff --git a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
index dffc94e94c0..3b46501a4bf 100644
--- a/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
+++ b/configserver-client/src/test/java/ai/vespa/hosted/client/HttpConfigServerClientTest.java
@@ -35,7 +35,7 @@ class HttpConfigServerClientTest {
@RegisterExtension
final WireMockExtension server = new WireMockExtension();
- final HttpConfigServerClient client = new HttpConfigServerClient(List.of(new AthenzService("mydomain", "yourservice")), "user");
+ final ConfigServerClient client = new HttpConfigServerClient(List.of(new AthenzService("mydomain", "yourservice")), "user");
@Test
void testRetries() {
@@ -88,8 +88,7 @@ class HttpConfigServerClientTest {
() -> client.send(HostStrategy.repeating(URI.create("http://localhost:" + server.port()), 10),
Method.GET)
.read(String::new));
- assertEquals(409, thrown.code());
- assertEquals("hi", thrown.body());
+ assertEquals("GET / failed with status 409 and no body", thrown.getMessage());
server.verify(1, getRequestedFor(urlEqualTo("/")));
server.verify(1, anyRequestedFor(anyUrl()));