aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configserver/pom.xml5
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java207
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java15
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java20
-rw-r--r--http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java1
5 files changed, 146 insertions, 102 deletions
diff --git a/configserver/pom.xml b/configserver/pom.xml
index 8cd1b4b4254..35c38eb7d7d 100644
--- a/configserver/pom.xml
+++ b/configserver/pom.xml
@@ -34,6 +34,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>defaults</artifactId>
<version>${project.version}</version>
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
index bad82180702..484204e6a53 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
@@ -1,26 +1,33 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.application;
-import ai.vespa.util.http.VespaClientBuilderFactory;
+import ai.vespa.util.http.VespaAsyncHttpClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.model.api.HostInfo;
import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
+import com.yahoo.log.LogLevel;
import com.yahoo.slime.Cursor;
import com.yahoo.vespa.config.server.http.JSONResponse;
-import org.glassfish.jersey.client.ClientProperties;
-import org.glassfish.jersey.client.proxy.WebResourceFactory;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.ProcessingException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientRequestFilter;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.HttpHeaders;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.net.URIBuilder;
+import org.apache.hc.core5.util.Timeout;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -28,8 +35,14 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER;
@@ -41,13 +54,12 @@ import static com.yahoo.config.model.api.container.ContainerServiceType.QRSERVER
*
* @author Ulf Lilleengen
* @author hmusum
+ * @author bjorncs
*/
-@SuppressWarnings("removal")
public class ConfigConvergenceChecker extends AbstractComponent {
private static final Logger log = Logger.getLogger(ConfigConvergenceChecker.class.getName());
- private static final String statePath = "/state/v1/";
- private static final String configSubPath = "config";
+
private final static Set<String> serviceTypesToCheck = Set.of(
CONTAINER.serviceName,
QRSERVER.serviceName,
@@ -58,17 +70,13 @@ public class ConfigConvergenceChecker extends AbstractComponent {
"distributor"
);
- private final StateApiFactory stateApiFactory;
- private final VespaClientBuilderFactory clientBuilderFactory = new VespaClientBuilderFactory();
- @Inject
- public ConfigConvergenceChecker() {
- this(ConfigConvergenceChecker::createStateApi);
- }
+ private final Executor responseHandlerExecutor =
+ Executors.newSingleThreadExecutor(new DaemonThreadFactory("config-convergence-checker-response-handler-"));
+ private final ObjectMapper jsonMapper = new ObjectMapper();
- public ConfigConvergenceChecker(StateApiFactory stateApiFactory) {
- this.stateApiFactory = stateApiFactory;
- }
+ @Inject
+ public ConfigConvergenceChecker() {}
/** Fetches the active config generation for all services in the given application. */
public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) {
@@ -92,62 +100,79 @@ public class ConfigConvergenceChecker extends AbstractComponent {
/** Check service identified by host and port in given application */
public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) {
Long wantedGeneration = application.getApplicationGeneration();
- try {
+ try (CloseableHttpAsyncClient client = createHttpClient()) {
+ client.start();
if ( ! hostInApplication(application, hostAndPortToCheck))
return ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration);
-
- long currentGeneration = getServiceGeneration(URI.create("http://" + hostAndPortToCheck), timeout);
+ long currentGeneration = getServiceGeneration(client, URI.create("http://" + hostAndPortToCheck), timeout).get();
boolean converged = currentGeneration >= wantedGeneration;
return ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged);
- } catch (ProcessingException e) { // e.g. if we cannot connect to the service to find generation
+ } catch (InterruptedException | ExecutionException | CancellationException e) { // e.g. if we cannot connect to the service to find generation
return ServiceResponse.createNotFoundResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
} catch (Exception e) {
return ServiceResponse.createErrorResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage());
}
}
- @Override
- public void deconstruct() {
- clientBuilderFactory.close();
- }
-
- @Path(statePath)
- public interface StateApi {
- @Path(configSubPath)
- @GET
- JsonNode config();
- }
-
- public interface StateApiFactory {
- StateApi createStateApi(Client client, URI serviceUri);
- }
-
/** Gets service generation for a list of services (in parallel). */
private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) {
- return services.parallelStream()
- .collect(Collectors.toMap(service -> service,
- service -> {
- try {
- return getServiceGeneration(URI.create("http://" + service.getHostName()
- + ":" + getStatePort(service).get()), timeout);
- }
- catch (ProcessingException e) { // Cannot connect to service to determine service generation
- return -1L;
- }
- },
- (v1, v2) -> { throw new IllegalStateException("Duplicate keys for values '" + v1 + "' and '" + v2 + "'."); },
- LinkedHashMap::new
- ));
+ try (CloseableHttpAsyncClient client = createHttpClient()) {
+ client.start();
+ List<CompletableFuture<Void>> inprogressRequests = new ArrayList<>();
+ ConcurrentMap<ServiceInfo, Long> temporaryResult = new ConcurrentHashMap<>();
+ for (ServiceInfo service : services) {
+ int statePort = getStatePort(service).orElse(0);
+ if (statePort <= 0) continue;
+ URI uri = URI.create("http://" + service.getHostName() + ":" + statePort);
+ CompletableFuture<Void> inprogressRequest = getServiceGeneration(client, uri, timeout)
+ .handle((result, error) -> {
+ if (result != null) {
+ temporaryResult.put(service, result);
+ } else {
+ log.log(
+ LogLevel.DEBUG,
+ error,
+ () -> String.format("Failed to retrieve service config generation for '%s': %s", service, error.getMessage()));
+ temporaryResult.put(service, -1L);
+ }
+ return null;
+ });
+ inprogressRequests.add(inprogressRequest);
+ }
+ CompletableFuture.allOf(inprogressRequests.toArray(CompletableFuture[]::new)).join();
+ return createMapOrderedByServiceList(services, temporaryResult);
+ } catch (IOException e) {
+ // Actual client implementation does not throw IOException on close()
+ throw new UncheckedIOException(e);
+ }
}
/** Get service generation of service at given URL */
- private long getServiceGeneration(URI serviceUrl, Duration timeout) {
- Client client = createClient(timeout);
+ private CompletableFuture<Long> getServiceGeneration(CloseableHttpAsyncClient client, URI serviceUrl, Duration timeout) {
+ SimpleHttpRequest request = SimpleHttpRequests.get(createApiUri(serviceUrl));
+ request.setHeader("Connection", "close");
+ request.setConfig(createRequestConfig(timeout));
+
+ // Ignoring returned Future object as we want to use the more flexible CompletableFuture instead
+ CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>();
+ client.execute(request, new FutureCallback<>() {
+ @Override public void completed(SimpleHttpResponse result) { responsePromise.complete(result); }
+ @Override public void failed(Exception ex) { responsePromise.completeExceptionally(ex); }
+ @Override public void cancelled() { responsePromise.cancel(false); }
+ });
+
+ // Don't do json parsing in http client's thread
+ return responsePromise.thenApplyAsync(this::handleResponse, responseHandlerExecutor);
+ }
+
+ private long handleResponse(SimpleHttpResponse response) throws UncheckedIOException {
try {
- StateApi state = stateApiFactory.createStateApi(client, serviceUrl);
- return generationFromContainerState(state.config());
- } finally {
- client.close();
+ int statusCode = response.getCode();
+ if (statusCode != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + statusCode);
+ if (response.getBody() == null) throw new IOException("Response has no content");
+ return generationFromContainerState(jsonMapper.readTree(response.getBodyText()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
@@ -166,16 +191,6 @@ public class ConfigConvergenceChecker extends AbstractComponent {
return false;
}
- private Client createClient(Duration timeout) {
- return clientBuilderFactory.newBuilder()
- .register(
- (ClientRequestFilter) ctx ->
- ctx.getHeaders().put(HttpHeaders.USER_AGENT, List.of("config-convergence-checker")))
- .property(ClientProperties.CONNECT_TIMEOUT, (int) timeout.toMillis())
- .property(ClientProperties.READ_TIMEOUT, (int) timeout.toMillis())
- .build();
- }
-
private static Optional<Integer> getStatePort(ServiceInfo service) {
return service.getPorts().stream()
.filter(port -> port.getTags().contains("state"))
@@ -187,9 +202,47 @@ public class ConfigConvergenceChecker extends AbstractComponent {
return state.get("config").get("generation").asLong(-1);
}
- private static StateApi createStateApi(Client client, URI uri) {
- WebTarget target = client.target(uri);
- return WebResourceFactory.newResource(StateApi.class, target);
+ private static Map<ServiceInfo, Long> createMapOrderedByServiceList(
+ List<ServiceInfo> services, ConcurrentMap<ServiceInfo, Long> result) {
+ Map<ServiceInfo, Long> orderedResult = new LinkedHashMap<>();
+ for (ServiceInfo service : services) {
+ Long generation = result.get(service);
+ if (generation != null) {
+ orderedResult.put(service, generation);
+ }
+ }
+ return orderedResult;
+ }
+
+ private static URI createApiUri(URI serviceUrl) {
+ try {
+ return new URIBuilder(serviceUrl)
+ .setPath("/state/v1/config")
+ .build();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private static RequestConfig createRequestConfig(Duration timeout) {
+ return RequestConfig.custom()
+ .setConnectionRequestTimeout(Timeout.ofSeconds(1))
+ .setResponseTimeout(Timeout.ofMilliseconds(timeout.toMillis()))
+ .setConnectTimeout(Timeout.ofSeconds(1))
+ .build();
+ }
+
+ private static CloseableHttpAsyncClient createHttpClient() {
+ return VespaAsyncHttpClientBuilder
+ .create(tlsStrategy ->
+ PoolingAsyncClientConnectionManagerBuilder.create()
+ .setMaxConnTotal(100)
+ .setMaxConnPerRoute(10)
+ .setTlsStrategy(tlsStrategy)
+ .build())
+ .setUserAgent("config-convergence-checker")
+ .setConnectionReuseStrategy((request, response, context) -> false) // Disable connection reuse
+ .build();
}
private static class ServiceListResponse extends JSONResponse {
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
index 4b25c36a9d7..8a3a3b6e0ca 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java
@@ -30,8 +30,8 @@ import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
* @author Ulf Lilleengen
@@ -183,10 +183,15 @@ public class ConfigConvergenceCheckerTest {
.withBody("response too slow")));
HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1));
// Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here
- assertResponse((responseBody) -> assertTrue("Response matches", responseBody.startsWith(
- "{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) +
- "\",\"wantedGeneration\":3,\"error\":\"java.net.SocketTimeoutException") &&
- responseBody.endsWith("\"}")), 404, response);
+ assertResponse(
+ responseBody ->
+ assertThat(responseBody)
+ .startsWith("{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) +
+ "\",\"wantedGeneration\":3,\"error\":\"")
+ .contains("java.net.SocketTimeoutException: 1 MILLISECONDS")
+ .endsWith("\"}"),
+ 404,
+ response);
}
private URI testServer() {
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
index 3e98c1a6533..910c4b069e3 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
@@ -1,7 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.http.v2;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.component.Version;
import com.yahoo.config.model.api.ModelFactory;
@@ -15,7 +14,6 @@ import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.http.HttpRequest.Method;
import com.yahoo.test.ManualClock;
-import com.yahoo.test.json.JsonTestHelper;
import com.yahoo.vespa.config.server.ApplicationRepository;
import com.yahoo.vespa.config.server.MockLogRetriever;
import com.yahoo.vespa.config.server.MockProvisioner;
@@ -25,7 +23,6 @@ import com.yahoo.vespa.config.server.application.ApplicationCuratorDatabase;
import com.yahoo.vespa.config.server.application.ApplicationReindexing;
import com.yahoo.vespa.config.server.application.ClusterReindexing;
import com.yahoo.vespa.config.server.application.ClusterReindexing.Status;
-import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker;
import com.yahoo.vespa.config.server.application.HttpProxy;
import com.yahoo.vespa.config.server.application.OrchestratorMock;
import com.yahoo.vespa.config.server.deploy.DeployTester;
@@ -45,13 +42,11 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import javax.ws.rs.client.Client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
@@ -638,21 +633,6 @@ public class ApplicationHandlerTest {
return createApplicationHandler().handle(createTestRequest(restartUrl, GET));
}
- private static class MockStateApiFactory implements ConfigConvergenceChecker.StateApiFactory {
- boolean createdApi = false;
- @Override
- public ConfigConvergenceChecker.StateApi createStateApi(Client client, URI serviceUri) {
- createdApi = true;
- return () -> {
- try {
- return new ObjectMapper().readTree("{\"config\":{\"generation\":1}}");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- };
- }
- }
-
private ApplicationHandler createApplicationHandler() {
return createApplicationHandler(applicationRepository);
}
diff --git a/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java
index 51e83bd870a..6c53ea0dc69 100644
--- a/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java
+++ b/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java
@@ -66,6 +66,7 @@ public class VespaAsyncHttpClientBuilder {
clientBuilder.disableAuthCaching();
clientBuilder.disableRedirectHandling();
clientBuilder.setConnectionManager(factory.create(tlsStrategy));
+ clientBuilder.setConnectionManagerShared(false);
return clientBuilder;
}