summaryrefslogtreecommitdiffstats
path: root/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
diff options
context:
space:
mode:
Diffstat (limited to 'configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java90
1 files changed, 74 insertions, 16 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
index ad14cf4aab6..a49af0a0e51 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.config.model.api.ApplicationClusterInfo;
import com.yahoo.config.model.api.HostInfo;
import com.yahoo.config.model.api.PortInfo;
import com.yahoo.config.model.api.ServiceInfo;
@@ -39,14 +40,17 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import static com.yahoo.config.model.api.container.ContainerServiceType.CLUSTERCONTROLLER_CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.LOGSERVER_CONTAINER;
+import static com.yahoo.config.model.api.container.ContainerServiceType.METRICS_PROXY_CONTAINER;
import static com.yahoo.config.model.api.container.ContainerServiceType.QRSERVER;
/**
@@ -65,13 +69,14 @@ public class ConfigConvergenceChecker extends AbstractComponent {
QRSERVER.serviceName,
LOGSERVER_CONTAINER.serviceName,
CLUSTERCONTROLLER_CONTAINER.serviceName,
+ METRICS_PROXY_CONTAINER.serviceName,
"searchnode",
"storagenode",
"distributor"
);
- private final Executor responseHandlerExecutor =
+ private final ExecutorService responseHandlerExecutor =
Executors.newSingleThreadExecutor(new DaemonThreadFactory("config-convergence-checker-response-handler-"));
private final ObjectMapper jsonMapper = new ObjectMapper();
@@ -80,20 +85,43 @@ public class ConfigConvergenceChecker extends AbstractComponent {
/** Fetches the active config generation for all services in the given application. */
public Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService) {
+ return getServiceConfigGenerations(application, timeoutPerService, true);
+ }
+
+ /**
+ * Fetches the active config generation for all services in the given application. Will not check services
+ * which defer config changes until restart if checkAll is false.
+ */
+ private Map<ServiceInfo, Long> getServiceConfigGenerations(Application application, Duration timeoutPerService, boolean checkAll) {
List<ServiceInfo> servicesToCheck = new ArrayList<>();
application.getModel().getHosts()
.forEach(host -> host.getServices().stream()
.filter(service -> serviceTypesToCheck.contains(service.getServiceType()))
+ .filter(serviceInfo -> shouldCheckService(checkAll, application, serviceInfo))
.forEach(service -> getStatePort(service).ifPresent(port -> servicesToCheck.add(service))));
+ log.log(Level.FINE, "Services to check for config convergence: " + servicesToCheck);
return getServiceGenerations(servicesToCheck, timeoutPerService);
}
- /** Check all services in given application. Returns the minimum current generation of all services */
- public ServiceListResponse getServiceConfigGenerations(Application application, URI uri, Duration timeoutPerService) {
- Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService);
+ /** Checks all services in given application. Returns the minimum current generation of all services */
+ public ServiceListResponse checkConvergenceForAllServices(Application application, Duration timeoutPerService) {
+ return checkConvergence(application, timeoutPerService, true);
+ }
+
+ /**
+ * Checks services except those which defer config changes until restart in the given application.
+ * Returns the minimum current generation of those services.
+ */
+ public ServiceListResponse checkConvergenceUnlessDeferringChangesUntilRestart(Application application) {
+ Duration timeoutPerService = Duration.ofSeconds(10);
+ return checkConvergence(application, timeoutPerService, false);
+ }
+
+ private ServiceListResponse checkConvergence(Application application, Duration timeoutPerService, boolean checkAll) {
+ Map<ServiceInfo, Long> currentGenerations = getServiceConfigGenerations(application, timeoutPerService, checkAll);
long currentGeneration = currentGenerations.values().stream().mapToLong(Long::longValue).min().orElse(-1);
- return new ServiceListResponse(currentGenerations, uri, application.getApplicationGeneration(), currentGeneration);
+ return new ServiceListResponse(currentGenerations, application.getApplicationGeneration(), currentGeneration);
}
/** Check service identified by host and port in given application */
@@ -113,6 +141,26 @@ public class ConfigConvergenceChecker extends AbstractComponent {
}
}
+ private boolean shouldCheckService(boolean checkServicesWithDeferChangesUntilRestart, Application application, ServiceInfo serviceInfo) {
+ if (checkServicesWithDeferChangesUntilRestart) return true;
+ if (isNotContainer(serviceInfo)) return true;
+ return serviceIsInClusterWhichShouldBeChecked(application, serviceInfo);
+ }
+
+ private boolean isNotContainer(ServiceInfo serviceInfo) {
+ return ! List.of(CONTAINER.serviceName, QRSERVER.serviceName, METRICS_PROXY_CONTAINER).contains(serviceInfo.getServiceType());
+ }
+
+ // Don't check service in a cluster which uses restartOnDeploy (new config will not be used until service is restarted)
+ private boolean serviceIsInClusterWhichShouldBeChecked(Application application, ServiceInfo serviceInfo) {
+ Set<ApplicationClusterInfo> excludeFromChecking = application.getModel().applicationClusterInfo()
+ .stream()
+ .filter(ApplicationClusterInfo::getDeferChangesUntilRestart)
+ .collect(Collectors.toSet());
+
+ return excludeFromChecking.stream().noneMatch(info -> info.name().equals(serviceInfo.getProperty("clustername").orElse("")));
+ }
+
/** Gets service generation for a list of services (in parallel). */
private Map<ServiceInfo, Long> getServiceGenerations(List<ServiceInfo> services, Duration timeout) {
try (CloseableHttpAsyncClient client = createHttpClient()) {
@@ -196,6 +244,16 @@ public class ConfigConvergenceChecker extends AbstractComponent {
.findFirst();
}
+ @Override
+ public void deconstruct() {
+ responseHandlerExecutor.shutdown();
+ try {
+ responseHandlerExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING, "Unable to shutdown executor", e);
+ }
+ }
+
private static long generationFromContainerState(JsonNode state) {
return state.get("config").get("generation").asLong(-1);
}
@@ -256,23 +314,23 @@ public class ConfigConvergenceChecker extends AbstractComponent {
public final boolean converged;
public final Optional<String> errorMessage;
- public ServiceResponse(Status status, Long wantedGeneration) {
- this(status, wantedGeneration, 0L);
+ public ServiceResponse(Status status, long wantedGeneration) {
+ this(status, wantedGeneration, 0);
}
- public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration) {
+ public ServiceResponse(Status status, long wantedGeneration, long currentGeneration) {
this(status, wantedGeneration, currentGeneration, false);
}
- public ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged) {
+ public ServiceResponse(Status status, long wantedGeneration, long currentGeneration, boolean converged) {
this(status, wantedGeneration, currentGeneration, converged, Optional.empty());
}
- public ServiceResponse(Status status, Long wantedGeneration, String errorMessage) {
- this(status, wantedGeneration, 0L, false, Optional.ofNullable(errorMessage));
+ public ServiceResponse(Status status, long wantedGeneration, String errorMessage) {
+ this(status, wantedGeneration, 0, false, Optional.ofNullable(errorMessage));
}
- private ServiceResponse(Status status, Long wantedGeneration, Long currentGeneration, boolean converged, Optional<String> errorMessage) {
+ private ServiceResponse(Status status, long wantedGeneration, long currentGeneration, boolean converged, Optional<String> errorMessage) {
this.status = status;
this.wantedGeneration = wantedGeneration;
this.currentGeneration = currentGeneration;
@@ -285,15 +343,15 @@ public class ConfigConvergenceChecker extends AbstractComponent {
public static class ServiceListResponse {
public final List<Service> services = new ArrayList<>();
- public final URI uri;
public final long wantedGeneration;
public final long currentGeneration;
+ public final boolean converged;
- public ServiceListResponse(Map<ServiceInfo, Long> services, URI uri, long wantedGeneration, long currentGeneration) {
+ public ServiceListResponse(Map<ServiceInfo, Long> services, long wantedGeneration, long currentGeneration) {
services.forEach((key, value) -> this.services.add(new Service(key, value)));
- this.uri = uri;
this.wantedGeneration = wantedGeneration;
this.currentGeneration = currentGeneration;
+ this.converged = currentGeneration >= wantedGeneration;
}
public List<Service> services() { return services; }