diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-11-25 16:19:30 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-11-25 16:19:30 +0100 |
commit | 41c6840d3cd4f164d582794af18932c33ef4db12 (patch) | |
tree | 046f453adde25f142e4e37e54ddb780c73c8f5d9 | |
parent | 93e887cb2d7ae0177eb70450b50007b1bff66985 (diff) |
Don't reuse clients
The unit tests never closes the config convergence checker, causing stale connections to eventually exhaust the limit for max open files.
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java | 73 |
1 files changed, 34 insertions, 39 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java index 1305503324c..484204e6a53 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java @@ -70,15 +70,13 @@ public class ConfigConvergenceChecker extends AbstractComponent { "distributor" ); - private final CloseableHttpAsyncClient client = createHttpClient(); + private final Executor responseHandlerExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("config-convergence-checker-response-handler-")); private final ObjectMapper jsonMapper = new ObjectMapper(); @Inject - public ConfigConvergenceChecker() { - this.client.start(); - } + public ConfigConvergenceChecker() {} /** Fetches the active config generation for all services in the given application. */ public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) { @@ -102,10 +100,11 @@ public class ConfigConvergenceChecker extends AbstractComponent { /** Check service identified by host and port in given application */ public JSONResponse getServiceConfigGenerationResponse(Application application, String hostAndPortToCheck, URI requestUrl, Duration timeout) { Long wantedGeneration = application.getApplicationGeneration(); - try { + try (CloseableHttpAsyncClient client = createHttpClient()) { + client.start(); if ( ! hostInApplication(application, hostAndPortToCheck)) return ServiceResponse.createHostNotFoundInAppResponse(requestUrl, hostAndPortToCheck, wantedGeneration); - long currentGeneration = getServiceGeneration(URI.create("http://" + hostAndPortToCheck), timeout).get(); + long currentGeneration = getServiceGeneration(client, URI.create("http://" + hostAndPortToCheck), timeout).get(); boolean converged = currentGeneration >= wantedGeneration; return ServiceResponse.createOkResponse(requestUrl, hostAndPortToCheck, wantedGeneration, currentGeneration, converged); } catch (InterruptedException | ExecutionException | CancellationException e) { // e.g. if we cannot connect to the service to find generation @@ -115,45 +114,41 @@ public class ConfigConvergenceChecker extends AbstractComponent { } } - @Override - public void deconstruct() { - try { - client.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - /** Gets service generation for a list of services (in parallel). */ private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) { - List<CompletableFuture<Void>> inprogressRequests = new ArrayList<>(); - ConcurrentMap<ServiceInfo, Long> temporaryResult = new ConcurrentHashMap<>(); - for (ServiceInfo service : services) { - int statePort = getStatePort(service).orElse(0); - if (statePort <= 0) continue; - - URI uri = URI.create("http://" + service.getHostName() + ":" + statePort); - CompletableFuture<Void> inprogressRequest = getServiceGeneration(uri, timeout) - .handle((result, error) -> { - if (result != null) { - temporaryResult.put(service, result); - } else { - log.log( - LogLevel.DEBUG, - error, - () -> String.format("Failed to retrieve service config generation for '%s': %s", service, error.getMessage())); - temporaryResult.put(service, -1L); - } - return null; - }); - inprogressRequests.add(inprogressRequest); + try (CloseableHttpAsyncClient client = createHttpClient()) { + client.start(); + List<CompletableFuture<Void>> inprogressRequests = new ArrayList<>(); + ConcurrentMap<ServiceInfo, Long> temporaryResult = new ConcurrentHashMap<>(); + for (ServiceInfo service : services) { + int statePort = getStatePort(service).orElse(0); + if (statePort <= 0) continue; + URI uri = URI.create("http://" + service.getHostName() + ":" + statePort); + CompletableFuture<Void> inprogressRequest = getServiceGeneration(client, uri, timeout) + .handle((result, error) -> { + if (result != null) { + temporaryResult.put(service, result); + } else { + log.log( + LogLevel.DEBUG, + error, + () -> String.format("Failed to retrieve service config generation for '%s': %s", service, error.getMessage())); + temporaryResult.put(service, -1L); + } + return null; + }); + inprogressRequests.add(inprogressRequest); + } + CompletableFuture.allOf(inprogressRequests.toArray(CompletableFuture[]::new)).join(); + return createMapOrderedByServiceList(services, temporaryResult); + } catch (IOException e) { + // Actual client implementation does not throw IOException on close() + throw new UncheckedIOException(e); } - CompletableFuture.allOf(inprogressRequests.toArray(CompletableFuture[]::new)).join(); - return createMapOrderedByServiceList(services, temporaryResult); } /** Get service generation of service at given URL */ - private CompletableFuture<Long> getServiceGeneration(URI serviceUrl, Duration timeout) { + private CompletableFuture<Long> getServiceGeneration(CloseableHttpAsyncClient client, URI serviceUrl, Duration timeout) { SimpleHttpRequest request = SimpleHttpRequests.get(createApiUri(serviceUrl)); request.setHeader("Connection", "close"); request.setConfig(createRequestConfig(timeout)); |