diff options
Diffstat (limited to 'configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java')
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java | 90 |
1 files changed, 74 insertions, 16 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java index ad14cf4aab6..a49af0a0e51 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.model.api.ApplicationClusterInfo; import com.yahoo.config.model.api.HostInfo; import com.yahoo.config.model.api.PortInfo; import com.yahoo.config.model.api.ServiceInfo; @@ -39,14 +40,17 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.LOGSERVER_CONTAINER; +import static com.yahoo.config.model.api.container.ContainerServiceType.METRICS_PROXY_CONTAINER; import static com.yahoo.config.model.api.container.ContainerServiceType.QRSERVER; /** @@ -65,13 +69,14 @@ public class ConfigConvergenceChecker extends AbstractComponent { QRSERVER.serviceName, LOGSERVER_CONTAINER.serviceName, CLUSTERCONTROLLER_CONTAINER.serviceName, + METRICS_PROXY_CONTAINER.serviceName, "searchnode", "storagenode", "distributor" ); - private final Executor responseHandlerExecutor = + private final ExecutorService responseHandlerExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("config-convergence-checker-response-handler-")); private final ObjectMapper jsonMapper = new ObjectMapper(); @@ -80,20 +85,43 @@ public class ConfigConvergenceChecker extends AbstractComponent { /** Fetches the active config generation for all services in the given application. */ public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) { + return getServiceConfigGenerations(application, timeoutPerService, true); + } + + /** + * Fetches the active config generation for all services in the given application. Will not check services + * which defer config changes until restart if checkAll is false. + */ + private Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService, boolean checkAll) { List<ServiceInfo> servicesToCheck = new ArrayList<>(); application.getModel().getHosts() .forEach(host -> host.getServices().stream() .filter(service -> serviceTypesToCheck.contains(service.getServiceType())) + .filter(serviceInfo -> shouldCheckService(checkAll, application, serviceInfo)) .forEach(service -> getStatePort(service).ifPresent(port -> servicesToCheck.add(service)))); + log.log(Level.FINE, "Services to check for config convergence: " + servicesToCheck); return getServiceGenerations(servicesToCheck, timeoutPerService); } - /** Check all services in given application. Returns the minimum current generation of all services */ - public ServiceListResponse getServiceConfigGenerations(Application application, URI uri, Duration timeoutPerService) { - Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService); + /** Checks all services in given application. Returns the minimum current generation of all services */ + public ServiceListResponse checkConvergenceForAllServices(Application application, Duration timeoutPerService) { + return checkConvergence(application, timeoutPerService, true); + } + + /** + * Checks services except those which defer config changes until restart in the given application. + * Returns the minimum current generation of those services. + */ + public ServiceListResponse checkConvergenceUnlessDeferringChangesUntilRestart(Application application) { + Duration timeoutPerService = Duration.ofSeconds(10); + return checkConvergence(application, timeoutPerService, false); + } + + private ServiceListResponse checkConvergence(Application application, Duration timeoutPerService, boolean checkAll) { + Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService, checkAll); long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1); - return new ServiceListResponse(currentGenerations, uri, application.getApplicationGeneration(), currentGeneration); + return new ServiceListResponse(currentGenerations, application.getApplicationGeneration(), currentGeneration); } /** Check service identified by host and port in given application */ @@ -113,6 +141,26 @@ public class ConfigConvergenceChecker extends AbstractComponent { } } + private boolean shouldCheckService(boolean checkServicesWithDeferChangesUntilRestart, Application application, ServiceInfo serviceInfo) { + if (checkServicesWithDeferChangesUntilRestart) return true; + if (isNotContainer(serviceInfo)) return true; + return serviceIsInClusterWhichShouldBeChecked(application, serviceInfo); + } + + private boolean isNotContainer(ServiceInfo serviceInfo) { + return ! List.of(CONTAINER.serviceName, QRSERVER.serviceName, METRICS_PROXY_CONTAINER).contains(serviceInfo.getServiceType()); + } + + // Don't check service in a cluster which uses restartOnDeploy (new config will not be used until service is restarted) + private boolean serviceIsInClusterWhichShouldBeChecked(Application application, ServiceInfo serviceInfo) { + Set<ApplicationClusterInfo> excludeFromChecking = application.getModel().applicationClusterInfo() + .stream() + .filter(ApplicationClusterInfo::getDeferChangesUntilRestart) + .collect(Collectors.toSet()); + + return excludeFromChecking.stream().noneMatch(info -> info.name().equals(serviceInfo.getProperty("clustername").orElse(""))); + } + /** Gets service generation for a list of services (in parallel). */ private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) { try (CloseableHttpAsyncClient client = createHttpClient()) { @@ -196,6 +244,16 @@ public class ConfigConvergenceChecker extends AbstractComponent { .findFirst(); } + @Override + public void deconstruct() { + responseHandlerExecutor.shutdown(); + try { + responseHandlerExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.log(Level.WARNING, "Unable to shutdown executor", e); + } + } + private static long generationFromContainerState(JsonNode state) { return state.get("config").get("generation").asLong(-1); } @@ -256,23 +314,23 @@ public class ConfigConvergenceChecker extends AbstractComponent { public final boolean converged; public final Optional<String> errorMessage; - public ServiceResponse(Status status, Long wantedGeneration) { - this(status, wantedGeneration, 0L); + public ServiceResponse(Status status, long wantedGeneration) { + this(status, wantedGeneration, 0); } - public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration) { + public ServiceResponse(Status status, long wantedGeneration, long currentGeneration) { this(status, wantedGeneration, currentGeneration, false); } - public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged) { + public ServiceResponse(Status status, long wantedGeneration, long currentGeneration, boolean converged) { this(status, wantedGeneration, currentGeneration, converged, Optional.empty()); } - public ServiceResponse(Status status, Long wantedGeneration, String errorMessage) { - this(status, wantedGeneration, 0L, false, Optional.ofNullable(errorMessage)); + public ServiceResponse(Status status, long wantedGeneration, String errorMessage) { + this(status, wantedGeneration, 0, false, Optional.ofNullable(errorMessage)); } - private ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged, Optional<String> errorMessage) { + private ServiceResponse(Status status, long wantedGeneration, long currentGeneration, boolean converged, Optional<String> errorMessage) { this.status = status; this.wantedGeneration = wantedGeneration; this.currentGeneration = currentGeneration; @@ -285,15 +343,15 @@ public class ConfigConvergenceChecker extends AbstractComponent { public static class ServiceListResponse { public final List<Service> services = new ArrayList<>(); - public final URI uri; public final long wantedGeneration; public final long currentGeneration; + public final boolean converged; - public ServiceListResponse(Map<ServiceInfo, Long> services, URI uri, long wantedGeneration, long currentGeneration) { + public ServiceListResponse(Map<ServiceInfo, Long> services, long wantedGeneration, long currentGeneration) { services.forEach((key, value) -> this.services.add(new Service(key, value))); - this.uri = uri; this.wantedGeneration = wantedGeneration; this.currentGeneration = currentGeneration; + this.converged = currentGeneration >= wantedGeneration; } public List<Service> services() { return services; } |