diff options
5 files changed, 146 insertions, 102 deletions
diff --git a/configserver/pom.xml b/configserver/pom.xml index 8cd1b4b4254..35c38eb7d7d 100644 --- a/configserver/pom.xml +++ b/configserver/pom.xml @@ -34,6 +34,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>defaults</artifactId> <version>${project.version}</version> diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java index bad82180702..484204e6a53 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java @@ -1,26 +1,33 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.application; -import ai.vespa.util.http.VespaClientBuilderFactory; +import ai.vespa.util.http.VespaAsyncHttpClientBuilder; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.model.api.HostInfo; import com.yahoo.config.model.api.PortInfo; import com.yahoo.config.model.api.ServiceInfo; +import com.yahoo.log.LogLevel; import com.yahoo.slime.Cursor; import com.yahoo.vespa.config.server.http.JSONResponse; -import org.glassfish.jersey.client.ClientProperties; -import org.glassfish.jersey.client.proxy.WebResourceFactory; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientRequestFilter; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.HttpHeaders; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequests; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.net.URIBuilder; +import org.apache.hc.core5.util.Timeout; + +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -28,8 +35,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.logging.Logger; -import java.util.stream.Collectors; import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER; @@ -41,13 +54,12 @@ import static com.yahoo.config.model.api.container.ContainerServiceType.QRSERVER * * @author Ulf Lilleengen * @author hmusum + * @author bjorncs */ -@SuppressWarnings("removal") public class ConfigConvergenceChecker extends AbstractComponent { private static final Logger log = Logger.getLogger(ConfigConvergenceChecker.class.getName()); - private static final String statePath = "/state/v1/"; - private static final String configSubPath = "config"; + private final static Set<String> serviceTypesToCheck = Set.of( CONTAINER.serviceName, QRSERVER.serviceName, @@ -58,17 +70,13 @@ public class ConfigConvergenceChecker extends AbstractComponent { "distributor" ); - private final StateApiFactory stateApiFactory; - private final VespaClientBuilderFactory clientBuilderFactory = new VespaClientBuilderFactory(); - @Inject - public ConfigConvergenceChecker() { - this(ConfigConvergenceChecker::createStateApi); - } + private final Executor responseHandlerExecutor = + Executors.newSingleThreadExecutor(new DaemonThreadFactory("config-convergence-checker-response-handler-")); + private final ObjectMapper jsonMapper = new ObjectMapper(); - public ConfigConvergenceChecker(StateApiFactory stateApiFactory) { - this.stateApiFactory = stateApiFactory; - } + @Inject + public ConfigConvergenceChecker() {} /** Fetches the active config generation for all services in the given application. */ public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) { @@ -92,62 +100,79 @@ public class ConfigConvergenceChecker extends AbstractComponent { /** Check service identified by host and port in given application */ public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) { Long wantedGeneration = application.getApplicationGeneration(); - try { + try (CloseableHttpAsyncClient client = createHttpClient()) { + client.start(); if ( ! hostInApplication(application, hostAndPortToCheck)) return ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration); - - long currentGeneration = getServiceGeneration(URI.create("http://" + hostAndPortToCheck), timeout); + long currentGeneration = getServiceGeneration(client, URI.create("http://" + hostAndPortToCheck), timeout).get(); boolean converged = currentGeneration >= wantedGeneration; return ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged); - } catch (ProcessingException e) { // e.g. if we cannot connect to the service to find generation + } catch (InterruptedException | ExecutionException | CancellationException e) { // e.g. if we cannot connect to the service to find generation return ServiceResponse.createNotFoundResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage()); } catch (Exception e) { return ServiceResponse.createErrorResponse(requestUrl, hostAndPortToCheck, wantedGeneration, e.getMessage()); } } - @Override - public void deconstruct() { - clientBuilderFactory.close(); - } - - @Path(statePath) - public interface StateApi { - @Path(configSubPath) - @GET - JsonNode config(); - } - - public interface StateApiFactory { - StateApi createStateApi(Client client, URI serviceUri); - } - /** Gets service generation for a list of services (in parallel). */ private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) { - return services.parallelStream() - .collect(Collectors.toMap(service -> service, - service -> { - try { - return getServiceGeneration(URI.create("http://" + service.getHostName() - + ":" + getStatePort(service).get()), timeout); - } - catch (ProcessingException e) { // Cannot connect to service to determine service generation - return -1L; - } - }, - (v1, v2) -> { throw new IllegalStateException("Duplicate keys for values '" + v1 + "' and '" + v2 + "'."); }, - LinkedHashMap::new - )); + try (CloseableHttpAsyncClient client = createHttpClient()) { + client.start(); + List<CompletableFuture<Void>> inprogressRequests = new ArrayList<>(); + ConcurrentMap<ServiceInfo, Long> temporaryResult = new ConcurrentHashMap<>(); + for (ServiceInfo service : services) { + int statePort = getStatePort(service).orElse(0); + if (statePort <= 0) continue; + URI uri = URI.create("http://" + service.getHostName() + ":" + statePort); + CompletableFuture<Void> inprogressRequest = getServiceGeneration(client, uri, timeout) + .handle((result, error) -> { + if (result != null) { + temporaryResult.put(service, result); + } else { + log.log( + LogLevel.DEBUG, + error, + () -> String.format("Failed to retrieve service config generation for '%s': %s", service, error.getMessage())); + temporaryResult.put(service, -1L); + } + return null; + }); + inprogressRequests.add(inprogressRequest); + } + CompletableFuture.allOf(inprogressRequests.toArray(CompletableFuture[]::new)).join(); + return createMapOrderedByServiceList(services, temporaryResult); + } catch (IOException e) { + // Actual client implementation does not throw IOException on close() + throw new UncheckedIOException(e); + } } /** Get service generation of service at given URL */ - private long getServiceGeneration(URI serviceUrl, Duration timeout) { - Client client = createClient(timeout); + private CompletableFuture<Long> getServiceGeneration(CloseableHttpAsyncClient client, URI serviceUrl, Duration timeout) { + SimpleHttpRequest request = SimpleHttpRequests.get(createApiUri(serviceUrl)); + request.setHeader("Connection", "close"); + request.setConfig(createRequestConfig(timeout)); + + // Ignoring returned Future object as we want to use the more flexible CompletableFuture instead + CompletableFuture<SimpleHttpResponse> responsePromise = new CompletableFuture<>(); + client.execute(request, new FutureCallback<>() { + @Override public void completed(SimpleHttpResponse result) { responsePromise.complete(result); } + @Override public void failed(Exception ex) { responsePromise.completeExceptionally(ex); } + @Override public void cancelled() { responsePromise.cancel(false); } + }); + + // Don't do json parsing in http client's thread + return responsePromise.thenApplyAsync(this::handleResponse, responseHandlerExecutor); + } + + private long handleResponse(SimpleHttpResponse response) throws UncheckedIOException { try { - StateApi state = stateApiFactory.createStateApi(client, serviceUrl); - return generationFromContainerState(state.config()); - } finally { - client.close(); + int statusCode = response.getCode(); + if (statusCode != HttpStatus.SC_OK) throw new IOException("Expected status code 200, got " + statusCode); + if (response.getBody() == null) throw new IOException("Response has no content"); + return generationFromContainerState(jsonMapper.readTree(response.getBodyText())); + } catch (IOException e) { + throw new UncheckedIOException(e); } } @@ -166,16 +191,6 @@ public class ConfigConvergenceChecker extends AbstractComponent { return false; } - private Client createClient(Duration timeout) { - return clientBuilderFactory.newBuilder() - .register( - (ClientRequestFilter) ctx -> - ctx.getHeaders().put(HttpHeaders.USER_AGENT, List.of("config-convergence-checker"))) - .property(ClientProperties.CONNECT_TIMEOUT, (int) timeout.toMillis()) - .property(ClientProperties.READ_TIMEOUT, (int) timeout.toMillis()) - .build(); - } - private static Optional<Integer> getStatePort(ServiceInfo service) { return service.getPorts().stream() .filter(port -> port.getTags().contains("state")) @@ -187,9 +202,47 @@ public class ConfigConvergenceChecker extends AbstractComponent { return state.get("config").get("generation").asLong(-1); } - private static StateApi createStateApi(Client client, URI uri) { - WebTarget target = client.target(uri); - return WebResourceFactory.newResource(StateApi.class, target); + private static Map<ServiceInfo, Long> createMapOrderedByServiceList( + List<ServiceInfo> services, ConcurrentMap<ServiceInfo, Long> result) { + Map<ServiceInfo, Long> orderedResult = new LinkedHashMap<>(); + for (ServiceInfo service : services) { + Long generation = result.get(service); + if (generation != null) { + orderedResult.put(service, generation); + } + } + return orderedResult; + } + + private static URI createApiUri(URI serviceUrl) { + try { + return new URIBuilder(serviceUrl) + .setPath("/state/v1/config") + .build(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } + + private static RequestConfig createRequestConfig(Duration timeout) { + return RequestConfig.custom() + .setConnectionRequestTimeout(Timeout.ofSeconds(1)) + .setResponseTimeout(Timeout.ofMilliseconds(timeout.toMillis())) + .setConnectTimeout(Timeout.ofSeconds(1)) + .build(); + } + + private static CloseableHttpAsyncClient createHttpClient() { + return VespaAsyncHttpClientBuilder + .create(tlsStrategy -> + PoolingAsyncClientConnectionManagerBuilder.create() + .setMaxConnTotal(100) + .setMaxConnPerRoute(10) + .setTlsStrategy(tlsStrategy) + .build()) + .setUserAgent("config-convergence-checker") + .setConnectionReuseStrategy((request, response, context) -> false) // Disable connection reuse + .build(); } private static class ServiceListResponse extends JSONResponse { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java index 4b25c36a9d7..8a3a3b6e0ca 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ConfigConvergenceCheckerTest.java @@ -30,8 +30,8 @@ import static com.github.tomakehurst.wiremock.client.WireMock.okJson; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; /** * @author Ulf Lilleengen @@ -183,10 +183,15 @@ public class ConfigConvergenceCheckerTest { .withBody("response too slow"))); HttpResponse response = checker.getServiceConfigGenerationResponse(application, hostAndPort(service), requestUrl, Duration.ofMillis(1)); // Message contained in a SocketTimeoutException may differ across platforms, so we do a partial match of the response here - assertResponse((responseBody) -> assertTrue("Response matches", responseBody.startsWith( - "{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) + - "\",\"wantedGeneration\":3,\"error\":\"java.net.SocketTimeoutException") && - responseBody.endsWith("\"}")), 404, response); + assertResponse( + responseBody -> + assertThat(responseBody) + .startsWith("{\"url\":\"" + requestUrl.toString() + "\",\"host\":\"" + hostAndPort(requestUrl) + + "\",\"wantedGeneration\":3,\"error\":\"") + .contains("java.net.SocketTimeoutException: 1 MILLISECONDS") + .endsWith("\"}"), + 404, + response); } private URI testServer() { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index 3e98c1a6533..910c4b069e3 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -1,7 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.http.v2; -import com.fasterxml.jackson.databind.ObjectMapper; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.component.Version; import com.yahoo.config.model.api.ModelFactory; @@ -15,7 +14,6 @@ import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.jdisc.Response; import com.yahoo.jdisc.http.HttpRequest.Method; import com.yahoo.test.ManualClock; -import com.yahoo.test.json.JsonTestHelper; import com.yahoo.vespa.config.server.ApplicationRepository; import com.yahoo.vespa.config.server.MockLogRetriever; import com.yahoo.vespa.config.server.MockProvisioner; @@ -25,7 +23,6 @@ import com.yahoo.vespa.config.server.application.ApplicationCuratorDatabase; import com.yahoo.vespa.config.server.application.ApplicationReindexing; import com.yahoo.vespa.config.server.application.ClusterReindexing; import com.yahoo.vespa.config.server.application.ClusterReindexing.Status; -import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker; import com.yahoo.vespa.config.server.application.HttpProxy; import com.yahoo.vespa.config.server.application.OrchestratorMock; import com.yahoo.vespa.config.server.deploy.DeployTester; @@ -45,13 +42,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.ws.rs.client.Client; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.URI; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; @@ -638,21 +633,6 @@ public class ApplicationHandlerTest { return createApplicationHandler().handle(createTestRequest(restartUrl, GET)); } - private static class MockStateApiFactory implements ConfigConvergenceChecker.StateApiFactory { - boolean createdApi = false; - @Override - public ConfigConvergenceChecker.StateApi createStateApi(Client client, URI serviceUri) { - createdApi = true; - return () -> { - try { - return new ObjectMapper().readTree("{\"config\":{\"generation\":1}}"); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - } - } - private ApplicationHandler createApplicationHandler() { return createApplicationHandler(applicationRepository); } diff --git a/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java b/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java index 51e83bd870a..6c53ea0dc69 100644 --- a/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java +++ b/http-utils/src/main/java/ai/vespa/util/http/VespaAsyncHttpClientBuilder.java @@ -66,6 +66,7 @@ public class VespaAsyncHttpClientBuilder { clientBuilder.disableAuthCaching(); clientBuilder.disableRedirectHandling(); clientBuilder.setConnectionManager(factory.create(tlsStrategy)); + clientBuilder.setConnectionManagerShared(false); return clientBuilder; } |