diff options
4 files changed, 46 insertions, 30 deletions
diff --git a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java index 2055cd6d74a..a9f36da9f97 100644 --- a/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java +++ b/http-client/src/main/java/ai/vespa/hosted/client/AbstractHttpClient.java @@ -26,11 +26,13 @@ import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; @@ -67,10 +69,14 @@ public abstract class AbstractHttpClient implements HttpClient { Throwable thrown = null; for (URI host : builder.hosts) { + Query query = builder.query; + for (Supplier<Query> dynamic : builder.dynamicQuery) + query = query.set(dynamic.get().lastEntries()); + ClassicHttpRequest request = ClassicRequestBuilder.create(builder.method.name()) .setUri(HttpURL.from(host) .appendPath(builder.path) - .appendQuery(builder.query) + .appendQuery(query) .asURI()) .build(); request.setEntity(builder.entity); @@ -143,6 +149,7 @@ public abstract class AbstractHttpClient implements HttpClient { private final HostStrategy hosts; private HttpURL.Path path = Path.empty(); private HttpURL.Query query = Query.empty(); + private List<Supplier<Query>> dynamicQuery = new ArrayList<>(); private HttpEntity entity; private RequestConfig config = HttpClient.defaultRequestConfig; private ResponseVerifier verifier = HttpClient.throwOnError; @@ -203,6 +210,12 @@ public abstract class AbstractHttpClient implements HttpClient { } @Override + public HttpClient.RequestBuilder parameters(Supplier<Query> query) { + dynamicQuery.add(query); + return this; + } + + @Override public RequestBuilder timeout(Duration timeout) { return config(RequestConfig.copy(config) .setResponseTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS) diff --git a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java index f5805ce5b94..e5a8ebcc8b3 100644 --- a/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java +++ b/http-client/src/main/java/ai/vespa/hosted/client/HttpClient.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.IntStream; import static java.nio.charset.StandardCharsets.UTF_8; @@ -99,6 +100,9 @@ public interface HttpClient extends Closeable { /** Appends all parameters from the given query. */ RequestBuilder parameters(Query query); + /** Sets all parameters from the given query dynamically, when creating retried requests. */ + RequestBuilder parameters(Supplier<Query> query); + /** Overrides the default socket read timeout of the request. {@code Duration.ZERO} gives infinite timeout. */ RequestBuilder timeout(Duration timeout); diff --git a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/ClusterControllerClientImpl.java b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/ClusterControllerClientImpl.java index 211b9f9ff0a..76f2af43579 100644 --- a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/ClusterControllerClientImpl.java +++ b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/ClusterControllerClientImpl.java @@ -30,6 +30,7 @@ import java.io.UncheckedIOException; import java.net.URI; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; @@ -59,9 +60,11 @@ public class ClusterControllerClientImpl implements ClusterControllerClient { @Override public boolean setNodeState(OrchestratorContext context, HostName host, int storageNodeIndex, ClusterControllerNodeState wantedState) { try { - Inspector response = client.send(strategyWithTimeout(hosts, context.getClusterControllerTimeouts()), Method.POST) + ClusterControllerClientTimeouts timeouts = context.getClusterControllerTimeouts(); + Inspector response = client.send(strategy(hosts), Method.POST) .at("cluster", "v2", clusterName, "storage", Integer.toString(storageNodeIndex)) - .deadline(context.getClusterControllerTimeouts().readBudget()) + .deadline(timeouts.readBudget()) + .parameters(() -> deadline(timeouts)) .body(stateChangeRequestBytes(wantedState, Condition.SAFE, context.isProbe())) .throwing(retryOnRedirect) .read(SlimeUtils::jsonToSlime).get(); @@ -105,9 +108,11 @@ public class ClusterControllerClientImpl implements ClusterControllerClient { public void setApplicationState(OrchestratorContext context, ApplicationInstanceId applicationId, ClusterControllerNodeState wantedState) throws ApplicationStateChangeDeniedException { try { - Inspector response = client.send(strategyWithTimeout(hosts, context.getClusterControllerTimeouts()), Method.POST) + ClusterControllerClientTimeouts timeouts = context.getClusterControllerTimeouts(); + Inspector response = client.send(strategy(hosts), Method.POST) .at("cluster", "v2", clusterName) - .deadline(context.getClusterControllerTimeouts().readBudget()) + .deadline(timeouts.readBudget()) + .parameters(() -> deadline(timeouts)) .body(stateChangeRequestBytes(wantedState, Condition.FORCE, false)) .throwing(retryOnRedirect) .read(SlimeUtils::jsonToSlime).get(); @@ -142,21 +147,24 @@ public class ClusterControllerClientImpl implements ClusterControllerClient { } /** ᕙ༼◕_◕༽ᕤ hack to vary query parameters with retries ᕙ༼◕_◕༽ᕤ */ - static HostStrategy strategyWithTimeout(List<HostName> hosts, ClusterControllerClientTimeouts timeouts) { - return () -> new Iterator<>() { - final Iterator<HostName> wrapped = hosts.iterator(); - @Override public boolean hasNext() { - return wrapped.hasNext(); - } - @Override public URI next() { - return HttpURL.create(Scheme.http, - DomainName.of(wrapped.next().s()), - 19050, - Path.empty(), - Query.empty().set("timeout", Double.toString(timeouts.getServerTimeoutOrThrow().toMillis() * 1e-3))) - .asURI(); - } - }; + static HostStrategy strategy(List<HostName> hosts) { + return hosts.size() == 1 + // If there's only 1 CC, we'll try that one twice. + ? HostStrategy.repeating(toUrl(hosts.get(0)), 2) + // Otherwise, try each host once: + // * if host 1 responds, it will redirect to master if necessary; otherwise + // * if host 2 responds, it will redirect to master if necessary; otherwise + // * if host 3 responds, it may redirect to master if necessary (if they're up + // after all), but more likely there's no quorum and this will fail too. + : HostStrategy.ordered(hosts.stream().map(ClusterControllerClientImpl::toUrl).collect(Collectors.toList())); + } + + static URI toUrl(HostName host) { + return HttpURL.create(Scheme.http, DomainName.of(host.s()), 19050).asURI(); + } + + static Query deadline(ClusterControllerClientTimeouts timeouts) { + return Query.empty().set("timeout", Double.toString(timeouts.getServerTimeoutOrThrow().toMillis() * 1e-3)); } static final ResponseVerifier retryOnRedirect = new ResponseVerifier() { diff --git a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java index c98cefc6c86..08cffcc461c 100644 --- a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java +++ b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/controller/RetryingClusterControllerClientFactory.java @@ -35,16 +35,7 @@ public class RetryingClusterControllerClientFactory extends AbstractComponent im @Override public ClusterControllerClient createClient(List<HostName> clusterControllers, String clusterName) { - List<HostName> hosts = clusterControllers.size() == 1 - // If there's only 1 CC, we'll try that one twice. - ? List.of(clusterControllers.get(0), clusterControllers.get(0)) - // Otherwise, try each host once: - // * if host 1 responds, it will redirect to master if necessary; otherwise - // * if host 2 responds, it will redirect to master if necessary; otherwise - // * if host 3 responds, it may redirect to master if necessary (if they're up - // after all), but more likely there's no quorum and this will fail too. - : List.copyOf(clusterControllers); - return new ClusterControllerClientImpl(client, hosts, clusterName); + return new ClusterControllerClientImpl(client, clusterControllers, clusterName); } @Override |