diff options
126 files changed, 2881 insertions, 735 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ConfigChangeAction.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ConfigChangeAction.java index e453aaf5bdb..ea4b6a2b02d 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ConfigChangeAction.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ConfigChangeAction.java @@ -38,4 +38,7 @@ public interface ConfigChangeAction { /** Returns whether this change should be allowed */ boolean allowed(); + /** Returns whether this change should be ignored for internal redeploy */ + boolean ignoreForInternalRedeploy(); + } diff --git a/config-model/src/main/java/com/yahoo/searchdefinition/SearchBuilder.java b/config-model/src/main/java/com/yahoo/searchdefinition/SearchBuilder.java index 779ae54c242..2225997e685 100644 --- a/config-model/src/main/java/com/yahoo/searchdefinition/SearchBuilder.java +++ b/config-model/src/main/java/com/yahoo/searchdefinition/SearchBuilder.java @@ -408,7 +408,15 @@ public class SearchBuilder { RankProfileRegistry rankProfileRegistry, QueryProfileRegistry queryProfileRegistry, DeployLogger logger) throws IOException, ParseException { - SearchBuilder builder = new SearchBuilder(MockApplicationPackage.fromSearchDefinitionDirectory(dir), + return createFromDirectory(dir, rankProfileRegistry, queryProfileRegistry, logger, MockApplicationPackage.fromSearchDefinitionDirectory(dir)); + } + + public static SearchBuilder createFromDirectory(String dir, + RankProfileRegistry rankProfileRegistry, + QueryProfileRegistry queryProfileRegistry, + DeployLogger logger, + ApplicationPackage applicationPackage) throws IOException, ParseException { + SearchBuilder builder = new SearchBuilder(applicationPackage, rankProfileRegistry, queryProfileRegistry); for (Iterator<Path> i = Files.list(new File(dir).toPath()).filter(p -> p.getFileName().toString().endsWith(".sd")).iterator(); i.hasNext(); ) { diff --git a/config-model/src/main/java/com/yahoo/searchdefinition/derived/DerivedConfiguration.java b/config-model/src/main/java/com/yahoo/searchdefinition/derived/DerivedConfiguration.java index fc8710fa1a1..4d6ce783947 100644 --- a/config-model/src/main/java/com/yahoo/searchdefinition/derived/DerivedConfiguration.java +++ b/config-model/src/main/java/com/yahoo/searchdefinition/derived/DerivedConfiguration.java @@ -29,7 +29,7 @@ import java.util.logging.Level; */ public class DerivedConfiguration { - private Search search; + private final Search search; private Summaries summaries; private SummaryMap summaryMap; private Juniperrc juniperrc; @@ -41,7 +41,7 @@ public class DerivedConfiguration { private VsmSummary streamingSummary; private IndexSchema indexSchema; private ImportedFields importedFields; - private QueryProfileRegistry queryProfiles; + private final QueryProfileRegistry queryProfiles; /** * Creates a complete derived configuration from a search definition. diff --git a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/ContainerRestartValidator.java b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/ContainerRestartValidator.java index 5cee08ea9af..3176ad9f912 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/ContainerRestartValidator.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/ContainerRestartValidator.java @@ -31,7 +31,7 @@ public class ContainerRestartValidator implements ChangeValidator { } private static ConfigChangeAction createConfigChangeAction(Container container) { - return new VespaRestartAction(createMessage(container), container.getServiceInfo()); + return new VespaRestartAction(createMessage(container), container.getServiceInfo(), true); } private static String createMessage(Container container) { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRefeedAction.java b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRefeedAction.java index 0fd38e5dbdd..be43c6eddfb 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRefeedAction.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRefeedAction.java @@ -6,7 +6,6 @@ import com.yahoo.config.model.api.ServiceInfo; import com.yahoo.config.application.api.ValidationOverrides; import java.time.Instant; -import java.util.Collections; import java.util.List; /** @@ -27,31 +26,29 @@ public class VespaRefeedAction extends VespaConfigChangeAction implements Config private final String documentType; private final boolean allowed; - private final Instant now; - private VespaRefeedAction(String name, String message, List<ServiceInfo> services, String documentType, boolean allowed, Instant now) { + private VespaRefeedAction(String name, String message, List<ServiceInfo> services, String documentType, boolean allowed) { super(message, services); this.name = name; this.documentType = documentType; this.allowed = allowed; - this.now = now; } /** Creates a refeed action with some missing information */ // TODO: We should require document type or model its absence properly public static VespaRefeedAction of(String name, ValidationOverrides overrides, String message, Instant now) { - return new VespaRefeedAction(name, message, Collections.emptyList(), "", overrides.allows(name, now), now); + return new VespaRefeedAction(name, message, List.of(), "", overrides.allows(name, now)); } /** Creates a refeed action */ public static VespaRefeedAction of(String name, ValidationOverrides overrides, String message, List<ServiceInfo> services, String documentType, Instant now) { - return new VespaRefeedAction(name, message, services, documentType, overrides.allows(name, now), now); + return new VespaRefeedAction(name, message, services, documentType, overrides.allows(name, now)); } @Override public VespaConfigChangeAction modifyAction(String newMessage, List<ServiceInfo> newServices, String documentType) { - return new VespaRefeedAction(name, newMessage, newServices, documentType, allowed, now); + return new VespaRefeedAction(name, newMessage, newServices, documentType, allowed); } @Override @@ -64,6 +61,11 @@ public class VespaRefeedAction extends VespaConfigChangeAction implements Config public boolean allowed() { return allowed; } @Override + public boolean ignoreForInternalRedeploy() { + return false; + } + + @Override public String toString() { return super.toString() + ", documentType='" + documentType + "'"; } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRestartAction.java b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRestartAction.java index d42db15d062..3ea18cac1d6 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRestartAction.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/change/VespaRestartAction.java @@ -4,8 +4,6 @@ package com.yahoo.vespa.model.application.validation.change; import com.yahoo.config.model.api.ConfigChangeRestartAction; import com.yahoo.config.model.api.ServiceInfo; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; /** @@ -15,16 +13,24 @@ import java.util.List; */ public class VespaRestartAction extends VespaConfigChangeAction implements ConfigChangeRestartAction { + private final boolean ignoreForInternalRedeploy; + public VespaRestartAction(String message) { - super(message, new ArrayList<>()); + this(message, List.of()); } public VespaRestartAction(String message, ServiceInfo service) { - super(message, Collections.singletonList(service)); + this(message, List.of(service)); + } + + public VespaRestartAction(String message, ServiceInfo services, boolean ignoreForInternalRedeploy) { + super(message, List.of(services)); + this.ignoreForInternalRedeploy = ignoreForInternalRedeploy; } public VespaRestartAction(String message, List<ServiceInfo> services) { super(message, services); + this.ignoreForInternalRedeploy = false; } @Override @@ -32,4 +38,25 @@ public class VespaRestartAction extends VespaConfigChangeAction implements Confi return new VespaRestartAction(newMessage, newServices); } + @Override + public boolean ignoreForInternalRedeploy() { + return ignoreForInternalRedeploy; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + VespaRestartAction that = (VespaRestartAction) o; + return ignoreForInternalRedeploy == that.ignoreForInternalRedeploy; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (ignoreForInternalRedeploy ? 1 : 0); + return result; + } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/ml/ModelName.java b/config-model/src/main/java/com/yahoo/vespa/model/ml/ModelName.java index 2c7dc6b337d..7e33faadfc0 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/ml/ModelName.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/ml/ModelName.java @@ -11,9 +11,9 @@ import com.yahoo.path.Path; public class ModelName { /** The namespace, or null if none */ - private String namespace; - private String name; - private String fullName; + private final String namespace; + private final String name; + private final String fullName; public ModelName(String name) { this(null, name); diff --git a/config-model/src/test/java/com/yahoo/searchdefinition/derived/NeuralNetTestCase.java b/config-model/src/test/java/com/yahoo/searchdefinition/derived/NeuralNetTestCase.java index a6171901d2d..3202317284f 100644 --- a/config-model/src/test/java/com/yahoo/searchdefinition/derived/NeuralNetTestCase.java +++ b/config-model/src/test/java/com/yahoo/searchdefinition/derived/NeuralNetTestCase.java @@ -2,7 +2,6 @@ package com.yahoo.searchdefinition.derived; import com.yahoo.search.Query; -import com.yahoo.search.query.profile.QueryProfileRegistry; import com.yahoo.search.query.profile.compiled.CompiledQueryProfileRegistry; import com.yahoo.search.query.profile.config.QueryProfileConfigurer; import com.yahoo.searchdefinition.parser.ParseException; diff --git a/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java b/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java index 1c53ff18222..fad276379cc 100644 --- a/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java +++ b/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java @@ -1,10 +1,6 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.provision; -import java.text.DecimalFormat; -import java.text.FieldPosition; -import java.text.NumberFormat; -import java.util.Locale; import java.util.Objects; import java.util.Optional; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index 57502973873..4670184c303 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -402,15 +402,11 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye TimeoutBudget timeoutBudget, boolean force) { LocalSession localSession = getLocalSession(tenant, sessionId); - Deployment deployment = deployment(localSession, tenant, timeoutBudget.timeLeft(), force); + Deployment deployment = Deployment.prepared(localSession, this, hostProvisioner, tenant, logger, timeoutBudget.timeout(), clock, false, force); deployment.activate(); return localSession.getApplicationId(); } - private Deployment deployment(LocalSession session, Tenant tenant, Duration timeout, boolean force) { - return Deployment.prepared(session, this, hostProvisioner, tenant, logger, timeout, clock, false, force); - } - public Transaction deactivateCurrentActivateNew(Session active, LocalSession prepared, boolean force) { Tenant tenant = tenantRepository.getTenant(prepared.getTenantName()); Transaction transaction = tenant.getSessionRepository().createActivateTransaction(prepared); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java index 98430c863cc..064e945041b 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java @@ -127,7 +127,10 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica return curator.exists(applicationPath(id)); } - /** Returns the active session id for the given application. Returns Optional.empty if application not found or no active session exists. */ + /** + * Returns the active session id for the given application. + * Returns Optional.empty if application not found or no active session exists. + */ public Optional<Long> activeSessionOf(ApplicationId id) { Optional<byte[]> data = curator.getData(applicationPath(id)); return (data.isEmpty() || data.get().length == 0) diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/configchange/RestartActions.java b/configserver/src/main/java/com/yahoo/vespa/config/server/configchange/RestartActions.java index 4db5a5e125f..fab36246a41 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/configchange/RestartActions.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/configchange/RestartActions.java @@ -5,6 +5,7 @@ import com.yahoo.config.model.api.ConfigChangeAction; import com.yahoo.config.model.api.ServiceInfo; import java.util.*; +import java.util.stream.Collectors; /** * Represents all actions to restart services in order to handle a config change. @@ -18,6 +19,7 @@ public class RestartActions { private final String clusterName; private final String clusterType; private final String serviceType; + private final boolean ignoreForInternalRedeploy; private final Set<ServiceInfo> services = new LinkedHashSet<>(); private final Set<String> messages = new TreeSet<>(); @@ -31,10 +33,11 @@ public class RestartActions { return this; } - private Entry(String clusterName, String clusterType, String serviceType) { + private Entry(String clusterName, String clusterType, String serviceType, boolean ignoreForInternalRedeploy) { this.clusterName = clusterName; this.clusterType = clusterType; this.serviceType = serviceType; + this.ignoreForInternalRedeploy = ignoreForInternalRedeploy; } public String getClusterName() { @@ -49,6 +52,10 @@ public class RestartActions { return serviceType; } + public boolean ignoreForInternalRedeploy() { + return ignoreForInternalRedeploy; + } + public Set<ServiceInfo> getServices() { return services; } @@ -59,28 +66,19 @@ public class RestartActions { } - private Entry addEntry(ServiceInfo service) { - String clusterName = service.getProperty("clustername").orElse(""); - String clusterType = service.getProperty("clustertype").orElse(""); - String entryId = clusterType + "." + clusterName + "." + service.getServiceType(); - Entry entry = actions.get(entryId); - if (entry == null) { - entry = new Entry(clusterName, clusterType, service.getServiceType()); - actions.put(entryId, entry); - } - return entry; - } - private final Map<String, Entry> actions = new TreeMap<>(); - public RestartActions() { + public RestartActions() { } + + private RestartActions(Map<String, Entry> actions) { + this.actions.putAll(actions); } public RestartActions(List<ConfigChangeAction> actions) { for (ConfigChangeAction action : actions) { if (action.getType().equals(ConfigChangeAction.Type.RESTART)) { for (ServiceInfo service : action.getServices()) { - addEntry(service). + addEntry(service, action.ignoreForInternalRedeploy()). addService(service). addMessage(action.getMessage()); } @@ -88,6 +86,24 @@ public class RestartActions { } } + private Entry addEntry(ServiceInfo service, boolean ignoreForInternalRedeploy) { + String clusterName = service.getProperty("clustername").orElse(""); + String clusterType = service.getProperty("clustertype").orElse(""); + String entryId = clusterType + "." + clusterName + "." + service.getServiceType() + "." + ignoreForInternalRedeploy; + Entry entry = actions.get(entryId); + if (entry == null) { + entry = new Entry(clusterName, clusterType, service.getServiceType(), ignoreForInternalRedeploy); + actions.put(entryId, entry); + } + return entry; + } + + public RestartActions useForInternalRestart(boolean useForInternalRestart) { + return new RestartActions(actions.entrySet().stream() + .filter(entry -> !useForInternalRestart || !entry.getValue().ignoreForInternalRedeploy()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + public List<Entry> getEntries() { return new ArrayList<>(actions.values()); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/Deployment.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/Deployment.java index 5f4f1298528..3726ea97fcc 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/Deployment.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/Deployment.java @@ -52,12 +52,14 @@ public class Deployment implements com.yahoo.config.provision.Deployment { private final Tenant tenant; private final DeployLogger deployLogger; private final Clock clock; + private final boolean internalRedeploy; private boolean prepared; private ConfigChangeActions configChangeActions; private Deployment(LocalSession session, ApplicationRepository applicationRepository, Supplier<PrepareParams> params, - Optional<Provisioner> provisioner, Tenant tenant, DeployLogger deployLogger, Clock clock, boolean prepared) { + Optional<Provisioner> provisioner, Tenant tenant, DeployLogger deployLogger, Clock clock, + boolean internalRedeploy, boolean prepared) { this.session = session; this.applicationRepository = applicationRepository; this.params = params; @@ -65,26 +67,27 @@ public class Deployment implements com.yahoo.config.provision.Deployment { this.tenant = tenant; this.deployLogger = deployLogger; this.clock = clock; + this.internalRedeploy = internalRedeploy; this.prepared = prepared; } public static Deployment unprepared(LocalSession session, ApplicationRepository applicationRepository, Optional<Provisioner> provisioner, Tenant tenant, PrepareParams params, DeployLogger logger, Clock clock) { - return new Deployment(session, applicationRepository, () -> params, provisioner, tenant, logger, clock, false); + return new Deployment(session, applicationRepository, () -> params, provisioner, tenant, logger, clock, false, false); } public static Deployment unprepared(LocalSession session, ApplicationRepository applicationRepository, Optional<Provisioner> provisioner, Tenant tenant, DeployLogger logger, Duration timeout, Clock clock, boolean validate, boolean isBootstrap, boolean internalRestart) { Supplier<PrepareParams> params = createPrepareParams(clock, timeout, session, isBootstrap, !validate, false, internalRestart); - return new Deployment(session, applicationRepository, params, provisioner, tenant, logger, clock, false); + return new Deployment(session, applicationRepository, params, provisioner, tenant, logger, clock, true, false); } public static Deployment prepared(LocalSession session, ApplicationRepository applicationRepository, Optional<Provisioner> provisioner, Tenant tenant, DeployLogger logger, Duration timeout, Clock clock, boolean isBootstrap, boolean force) { Supplier<PrepareParams> params = createPrepareParams(clock, timeout, session, isBootstrap, false, force, false); - return new Deployment(session, applicationRepository, params, provisioner, tenant, logger, clock, true); + return new Deployment(session, applicationRepository, params, provisioner, tenant, logger, clock, false, true); } /** Prepares this. This does nothing if this is already prepared */ @@ -136,17 +139,21 @@ public class Deployment implements com.yahoo.config.provision.Deployment { (previousActiveSession != null ? ". Based on session " + previousActiveSession.getSessionId() : "") + ". File references: " + applicationRepository.getFileReferences(applicationId)); - if (params.internalRestart() && !configChangeActions.getRestartActions().isEmpty()) { - Set<String> hostnames = configChangeActions.getRestartActions().getEntries().stream() - .flatMap(entry -> entry.getServices().stream()) - .map(ServiceInfo::getHostName) - .collect(Collectors.toUnmodifiableSet()); + if (params.internalRestart()) { + RestartActions restartActions = configChangeActions.getRestartActions().useForInternalRestart(internalRedeploy); - provisioner.get().restart(applicationId, HostFilter.from(hostnames, Set.of(), Set.of(), Set.of())); - deployLogger.log(Level.INFO, String.format("Scheduled service restart of %d nodes: %s", - hostnames.size(), hostnames.stream().sorted().collect(Collectors.joining(", ")))); + if (!restartActions.isEmpty()) { + Set<String> hostnames = restartActions.getEntries().stream() + .flatMap(entry -> entry.getServices().stream()) + .map(ServiceInfo::getHostName) + .collect(Collectors.toUnmodifiableSet()); - this.configChangeActions = new ConfigChangeActions(new RestartActions(), configChangeActions.getRefeedActions()); + provisioner.get().restart(applicationId, HostFilter.from(hostnames, Set.of(), Set.of(), Set.of())); + deployLogger.log(Level.INFO, String.format("Scheduled service restart of %d nodes: %s", + hostnames.size(), hostnames.stream().sorted().collect(Collectors.joining(", ")))); + + this.configChangeActions = new ConfigChangeActions(new RestartActions(), configChangeActions.getRefeedActions()); + } } return session.getMetaData().getGeneration(); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ConfigServerMaintenance.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ConfigServerMaintenance.java index 751e983770a..0e75d683478 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ConfigServerMaintenance.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ConfigServerMaintenance.java @@ -27,7 +27,6 @@ public class ConfigServerMaintenance extends AbstractComponent { private final FileDistributionMaintainer fileDistributionMaintainer; private final SessionsMaintainer sessionsMaintainer; private final ApplicationPackageMaintainer applicationPackageMaintainer; - private final LocksMaintainer locksMaintainer; @Inject public ConfigServerMaintenance(ConfigServerBootstrap configServerBootstrap, @@ -41,7 +40,6 @@ public class ConfigServerMaintenance extends AbstractComponent { fileDistributionMaintainer = new FileDistributionMaintainer(applicationRepository, curator, defaults.defaultInterval, flagSource); sessionsMaintainer = new SessionsMaintainer(applicationRepository, curator, Duration.ofSeconds(30), flagSource); applicationPackageMaintainer = new ApplicationPackageMaintainer(applicationRepository, curator, Duration.ofSeconds(30), flagSource); - locksMaintainer = new LocksMaintainer(applicationRepository, curator, Duration.ofSeconds(10), flagSource); } @Override @@ -50,7 +48,6 @@ public class ConfigServerMaintenance extends AbstractComponent { sessionsMaintainer.close(); applicationPackageMaintainer.close(); tenantsMaintainer.close(); - locksMaintainer.close(); } /* diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/LocksMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/LocksMaintainer.java deleted file mode 100644 index b43d65d4c0a..00000000000 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/LocksMaintainer.java +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.server.maintenance; - -import com.yahoo.path.Path; -import com.yahoo.vespa.config.server.ApplicationRepository; -import com.yahoo.vespa.curator.Curator; -import com.yahoo.vespa.flags.FlagSource; -import org.apache.zookeeper.data.Stat; - -import java.time.Duration; -import java.time.Instant; -import java.util.List; -import java.util.Optional; -import java.util.logging.Level; - -/** - * Debug maintainer for logging info about node repository locks. Logs with level FINE, so to actually log something - * one needs to change log levels with vespa-logctl - * - * @author hmusum - */ -public class LocksMaintainer extends ConfigServerMaintainer { - - private final boolean hostedVespa; - private final Curator curator; - - LocksMaintainer(ApplicationRepository applicationRepository, Curator curator, Duration interval, FlagSource flagSource) { - super(applicationRepository, curator, flagSource, Duration.ZERO, interval); - this.hostedVespa = applicationRepository.configserverConfig().hostedVespa(); - this.curator = curator; - } - - @Override - protected boolean maintain() { - if (! hostedVespa) return true; - - Path unallocatedLockPath = Path.fromString("/provision/v1/locks/unallocatedLock"); - logLockInfo(unallocatedLockPath); - - applicationRepository.listApplications().forEach(applicationId -> { - Path applicationLockPath = Path.fromString("/provision/v1/locks").append(applicationId.tenant().value()) - .append(applicationId.application().value()) - .append(applicationId.instance().value()); - logLockInfo(applicationLockPath); - }); - - return true; - } - - private void logLockInfo(Path path) { - List<String> children = curator.getChildren(path); - if (children.size() > 0) - log.log(Level.FINE, path + " has " + children.size() + " locks "); - children.forEach(lockString -> { - Optional<Instant> createTime = Optional.empty(); - Path lockPath = path.append(lockString); - Optional<Stat> stat = curator.getStat(lockPath); - if (stat.isPresent()) { - createTime = Optional.of(Instant.ofEpochMilli(stat.get().getCtime())); - } - log.log(Level.FINE, "Lock /" + lockPath + " created " + createTime.map(Instant::toString).orElse(" at unknown time")); - }); - } - -} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/ConfigChangeActionsBuilder.java b/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/ConfigChangeActionsBuilder.java index e2c3369d49e..ead1e79a416 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/ConfigChangeActionsBuilder.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/ConfigChangeActionsBuilder.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.config.server.configchange; import com.google.common.collect.ImmutableMap; import com.yahoo.config.model.api.ConfigChangeAction; import com.yahoo.config.model.api.ServiceInfo; +import com.yahoo.vespa.model.application.validation.change.VespaRestartAction; import java.util.ArrayList; import java.util.List; @@ -22,11 +23,17 @@ public class ConfigChangeActionsBuilder { } public ConfigChangeActionsBuilder restart(String message, String clusterName, String clusterType, String serviceType, String serviceName) { - actions.add(new MockRestartAction(message, - List.of(createService(clusterName, clusterType, serviceType, serviceName)))); + return restart(message, clusterName, clusterType, serviceType, serviceName, false); + } + + public ConfigChangeActionsBuilder restart(String message, String clusterName, String clusterType, String serviceType, String serviceName, boolean ignoreForInternalRedeploy) { + actions.add(new VespaRestartAction(message, + createService(clusterName, clusterType, serviceType, serviceName), + ignoreForInternalRedeploy)); return this; } + ConfigChangeActionsBuilder refeed(String name, boolean allowed, String message, String documentType, String clusterName, String serviceName) { actions.add(new MockRefeedAction(name, allowed, diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/MockRefeedAction.java b/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/MockRefeedAction.java index bdf63befd15..904ca10aa1c 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/MockRefeedAction.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/MockRefeedAction.java @@ -30,6 +30,11 @@ public class MockRefeedAction extends MockConfigChangeAction implements ConfigCh public boolean allowed() { return allowed; } @Override + public boolean ignoreForInternalRedeploy() { + return false; + } + + @Override public String getDocumentType() { return documentType; } } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/MockRestartAction.java b/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/MockRestartAction.java deleted file mode 100644 index b1183f91282..00000000000 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/MockRestartAction.java +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.server.configchange; - -import com.yahoo.config.model.api.ConfigChangeRestartAction; -import com.yahoo.config.model.api.ServiceInfo; - -import java.util.List; - -/** - * @author geirst - * @since 5.44 - */ -public class MockRestartAction extends MockConfigChangeAction implements ConfigChangeRestartAction { - public MockRestartAction(String message, List<ServiceInfo> services) { - super(message, services); - } -} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/RestartActionsTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/RestartActionsTest.java index ee0180802af..c19b81aa91b 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/RestartActionsTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/configchange/RestartActionsTest.java @@ -5,8 +5,10 @@ import com.yahoo.config.model.api.ServiceInfo; import org.junit.Test; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -81,4 +83,18 @@ public class RestartActionsTest { assertThat(toString(entries.get(0)), equalTo("content.foo.searchnode:[baz][change]")); assertThat(toString(entries.get(1)), equalTo("search.foo.searchnode:[baz][change]")); } + + @Test + public void use_for_internal_restart_test() { + ConfigChangeActions actions = new ConfigChangeActionsBuilder() + .restart(CHANGE_MSG, CLUSTER, CLUSTER_TYPE, SERVICE_TYPE, SERVICE_NAME) + .restart(CHANGE_MSG, CLUSTER, CLUSTER_TYPE_2, SERVICE_TYPE, SERVICE_NAME, true).build(); + + assertEquals(Set.of(CLUSTER_TYPE, CLUSTER_TYPE_2), + actions.getRestartActions().getEntries().stream().map(RestartActions.Entry::getClusterType).collect(Collectors.toSet())); + assertEquals(Set.of(CLUSTER_TYPE, CLUSTER_TYPE_2), + actions.getRestartActions().useForInternalRestart(false).getEntries().stream().map(RestartActions.Entry::getClusterType).collect(Collectors.toSet())); + assertEquals(Set.of(CLUSTER_TYPE), + actions.getRestartActions().useForInternalRestart(true).getEntries().stream().map(RestartActions.Entry::getClusterType).collect(Collectors.toSet())); + } } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java index 04d018b71c0..18ca54e8c11 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/HostedDeployTest.java @@ -20,12 +20,12 @@ import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.RegionName; import com.yahoo.config.provision.Zone; import com.yahoo.test.ManualClock; -import com.yahoo.vespa.config.server.configchange.MockRestartAction; import com.yahoo.vespa.config.server.configchange.RestartActions; import com.yahoo.vespa.config.server.http.InvalidApplicationException; import com.yahoo.vespa.config.server.http.v2.PrepareResult; import com.yahoo.vespa.config.server.model.TestModelFactory; import com.yahoo.vespa.config.server.session.PrepareParams; +import com.yahoo.vespa.model.application.validation.change.VespaRestartAction; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -377,8 +377,8 @@ public class HostedDeployTest { new ServiceInfo("serviceName", "serviceType", null, new HashMap<>(), "configId", "hostName")); List<ModelFactory> modelFactories = List.of( - new ConfigChangeActionsModelFactory(Version.fromString("6.1.0"), new MockRestartAction("change", services)), - new ConfigChangeActionsModelFactory(Version.fromString("6.2.0"), new MockRestartAction("other change", services))); + new ConfigChangeActionsModelFactory(Version.fromString("6.1.0"), new VespaRestartAction("change", services)), + new ConfigChangeActionsModelFactory(Version.fromString("6.2.0"), new VespaRestartAction("other change", services))); DeployTester tester = createTester(hosts, modelFactories, prodZone); PrepareResult prepareResult = tester.deployApp("src/test/apps/hosted/", "6.2.0"); diff --git a/container-disc/src/main/java/com/yahoo/container/jdisc/metric/MetricConsumerProviderProvider.java b/container-disc/src/main/java/com/yahoo/container/jdisc/metric/MetricConsumerProviderProvider.java index a3f8819f843..a44650a153d 100644 --- a/container-disc/src/main/java/com/yahoo/container/jdisc/metric/MetricConsumerProviderProvider.java +++ b/container-disc/src/main/java/com/yahoo/container/jdisc/metric/MetricConsumerProviderProvider.java @@ -15,7 +15,6 @@ import com.yahoo.metrics.MetricsPresentationConfig; * * @author bratseth */ -@SuppressWarnings("unused") // Injected public class MetricConsumerProviderProvider implements Provider<MetricConsumerProvider> { private final MetricConsumerProvider provided; diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java index e6310cc6432..a00992da815 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchive.java @@ -45,6 +45,11 @@ import static com.yahoo.yolean.Exceptions.uncheck; * The flag files must reside in a 'flags/' root directory containing a directory for each flag name: * {@code ./flags/<flag-id>/*.json} * + * Optionally, there can be an arbitrary number of directories "between" 'flags/' root directory and + * the flag name directory: + * {@code ./flags/onelevel/<flag-id>/*.json} + * {@code ./flags/onelevel/anotherlevel/<flag-id>/*.json} + * * @author bjorncs */ public class SystemFlagsDataArchive { @@ -155,7 +160,7 @@ public class SystemFlagsDataArchive { if (!filename.endsWith(".json")) { throw new IllegalArgumentException(String.format("Only JSON files are allowed in 'flags/' directory (found '%s')", filePath.toString())); } - FlagId directoryDeducedFlagId = new FlagId(filePath.getName(1).toString()); + FlagId directoryDeducedFlagId = new FlagId(filePath.getName(filePath.getNameCount()-2).toString()); FlagData flagData; if (rawData.isBlank()) { flagData = new FlagData(directoryDeducedFlagId); @@ -178,6 +183,13 @@ public class SystemFlagsDataArchive { "\nSee https://git.ouroath.com/vespa/hosted-feature-flags for more info on the JSON syntax"); } } + + if (builder.hasFile(filename, flagData)) { + throw new IllegalArgumentException( + String.format("Flag data file in '%s' contains redundant flag data for id '%s' already set in another directory!", + filePath, flagData.id())); + } + builder.addFile(filename, flagData); } @@ -236,6 +248,10 @@ public class SystemFlagsDataArchive { return this; } + public boolean hasFile(String filename, FlagData data) { + return files.containsKey(data.id()) && files.get(data.id()).containsKey(filename); + } + public SystemFlagsDataArchive build() { Map<FlagId, Map<String, FlagData>> copy = new TreeMap<>(); files.forEach((flagId, map) -> copy.put(flagId, new TreeMap<>(map))); diff --git a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java index 4cdbe5241bc..aca991ec637 100644 --- a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java +++ b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/systemflags/v1/SystemFlagsDataArchiveTest.java @@ -84,6 +84,19 @@ public class SystemFlagsDataArchiveTest { } @Test + public void supports_multi_level_flags_directory() { + var archive = SystemFlagsDataArchive.fromDirectory(Paths.get("src/test/resources/system-flags-multi-level/")); + assertFlagDataHasValue(archive, MY_TEST_FLAG, mainControllerTarget, "default"); + } + + @Test + public void duplicated_flagdata_is_detected() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("contains redundant flag data for id 'my-test-flag' already set in another directory!"); + var archive = SystemFlagsDataArchive.fromDirectory(Paths.get("src/test/resources/system-flags-multi-level-with-duplicated-flagdata/")); + } + + @Test public void empty_files_are_handled_as_no_flag_data_for_target() { var archive = SystemFlagsDataArchive.fromDirectory(Paths.get("src/test/resources/system-flags/")); assertNoFlagData(archive, FLAG_WITH_EMPTY_DATA, mainControllerTarget); diff --git a/controller-api/src/test/resources/system-flags-multi-level-with-duplicated-flagdata/flags/group-1/my-test-flag/default.json b/controller-api/src/test/resources/system-flags-multi-level-with-duplicated-flagdata/flags/group-1/my-test-flag/default.json new file mode 100644 index 00000000000..5924eb860c0 --- /dev/null +++ b/controller-api/src/test/resources/system-flags-multi-level-with-duplicated-flagdata/flags/group-1/my-test-flag/default.json @@ -0,0 +1,8 @@ +{ + "id" : "my-test-flag", + "rules" : [ + { + "value" : "default" + } + ] +}
\ No newline at end of file diff --git a/controller-api/src/test/resources/system-flags-multi-level-with-duplicated-flagdata/flags/group-2/my-test-flag/default.json b/controller-api/src/test/resources/system-flags-multi-level-with-duplicated-flagdata/flags/group-2/my-test-flag/default.json new file mode 100644 index 00000000000..5924eb860c0 --- /dev/null +++ b/controller-api/src/test/resources/system-flags-multi-level-with-duplicated-flagdata/flags/group-2/my-test-flag/default.json @@ -0,0 +1,8 @@ +{ + "id" : "my-test-flag", + "rules" : [ + { + "value" : "default" + } + ] +}
\ No newline at end of file diff --git a/controller-api/src/test/resources/system-flags-multi-level/flags/group-1/my-test-flag/default.json b/controller-api/src/test/resources/system-flags-multi-level/flags/group-1/my-test-flag/default.json new file mode 100644 index 00000000000..5924eb860c0 --- /dev/null +++ b/controller-api/src/test/resources/system-flags-multi-level/flags/group-1/my-test-flag/default.json @@ -0,0 +1,8 @@ +{ + "id" : "my-test-flag", + "rules" : [ + { + "value" : "default" + } + ] +}
\ No newline at end of file diff --git a/controller-api/src/test/resources/system-flags-multi-level/flags/group-2/my-other-test-flag/default.json b/controller-api/src/test/resources/system-flags-multi-level/flags/group-2/my-other-test-flag/default.json new file mode 100644 index 00000000000..e30485b755c --- /dev/null +++ b/controller-api/src/test/resources/system-flags-multi-level/flags/group-2/my-other-test-flag/default.json @@ -0,0 +1,8 @@ +{ + "id" : "my-other-test-flag", + "rules" : [ + { + "value" : "default" + } + ] +}
\ No newline at end of file diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiHandler.java index 36a46f72d34..4a245fb3555 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiHandler.java @@ -116,11 +116,9 @@ public class ZoneApiHandler extends AuditLoggingRequestHandler { boolean useConfigServerVip = Flags.USE_CONFIG_SERVER_VIP.bindTo(flagSource) .with(FetchVector.Dimension.ZONE_ID, zoneId.value()).value(); - // TODO: Still need to hardcode AWS since flag cannot be set until flag has been rolled out - if (zoneId.region().value().startsWith("aws-") || useConfigServerVip) { - return ProxyRequest.tryOne(zoneRegistry.getConfigServerVipUri(zoneId), path, request); - } - return ProxyRequest.tryAll(zoneRegistry.getConfigServerUris(zoneId), path, request); + return useConfigServerVip + ? ProxyRequest.tryOne(zoneRegistry.getConfigServerVipUri(zoneId), path, request) + : ProxyRequest.tryAll(zoneRegistry.getConfigServerUris(zoneId), path, request); } } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiTest.java index a7adac7f89d..61915860d7c 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/zone/v2/ZoneApiTest.java @@ -81,7 +81,7 @@ public class ZoneApiTest extends ControllerContainerTest { tester.assertResponse(operatorRequest("http://localhost:8080/zone/v2/dev/aws-us-north-2/nodes/v2/node/node1", "{\"currentRestartGeneration\": 1}", Method.PATCH), "ok"); - assertLastRequest(ZoneId.from("dev", "aws-us-north-2"), 1, "PATCH"); + assertLastRequest(ZoneId.from("dev", "aws-us-north-2"), 2, "PATCH"); assertEquals("{\"currentRestartGeneration\": 1}", proxy.lastRequestBody().get()); assertFalse("Actions are logged to audit log", tester.controller().auditLogger().readLog().entries().isEmpty()); diff --git a/eval/CMakeLists.txt b/eval/CMakeLists.txt index 33bd098975a..03a19b8c2c4 100644 --- a/eval/CMakeLists.txt +++ b/eval/CMakeLists.txt @@ -34,6 +34,7 @@ vespa_define_module( src/tests/gp/ponder_nov2017 src/tests/instruction/generic_join src/tests/instruction/generic_rename + src/tests/tensor/default_value_builder_factory src/tests/tensor/dense_add_dimension_optimizer src/tests/tensor/dense_dimension_combiner src/tests/tensor/dense_dot_product_function diff --git a/eval/src/tests/eval/value_codec/value_codec_test.cpp b/eval/src/tests/eval/value_codec/value_codec_test.cpp index 00f37e1f87a..01071b754fb 100644 --- a/eval/src/tests/eval/value_codec/value_codec_test.cpp +++ b/eval/src/tests/eval/value_codec/value_codec_test.cpp @@ -87,7 +87,7 @@ struct TensorExample { virtual void encode_default(nbostream &dst) const = 0; virtual void encode_with_double(nbostream &dst) const = 0; virtual void encode_with_float(nbostream &dst) const = 0; - void verify_encode_decode() const { + void verify_encode_decode(bool is_dense) const { nbostream expect_default; nbostream expect_double; nbostream expect_float; @@ -98,10 +98,15 @@ struct TensorExample { nbostream data_float; encode_value(*make_tensor(false), data_double); encode_value(*make_tensor(true), data_float); - EXPECT_EQ(Memory(data_double.peek(), data_double.size()), - Memory(expect_default.peek(), expect_default.size())); - EXPECT_EQ(Memory(data_float.peek(), data_float.size()), - Memory(expect_float.peek(), expect_float.size())); + if (is_dense) { + EXPECT_EQ(Memory(data_double.peek(), data_double.size()), + Memory(expect_default.peek(), expect_default.size())); + EXPECT_EQ(Memory(data_float.peek(), data_float.size()), + Memory(expect_float.peek(), expect_float.size())); + } else { + EXPECT_EQ(spec_from_value(*decode_value(data_double, factory)), make_spec(false)); + EXPECT_EQ(spec_from_value(*decode_value(data_float, factory)), make_spec(true)); + } EXPECT_EQ(spec_from_value(*decode_value(expect_default, factory)), make_spec(false)); EXPECT_EQ(spec_from_value(*decode_value(expect_double, factory)), make_spec(false)); EXPECT_EQ(spec_from_value(*decode_value(expect_float, factory)), make_spec(true)); @@ -155,7 +160,7 @@ struct SparseTensorExample : TensorExample { TEST(ValueCodecTest, sparse_tensors_can_be_encoded_and_decoded) { SparseTensorExample f1; - f1.verify_encode_decode(); + f1.verify_encode_decode(false); } //----------------------------------------------------------------------------- @@ -205,7 +210,7 @@ struct DenseTensorExample : TensorExample { TEST(ValueCodecTest, dense_tensors_can_be_encoded_and_decoded) { DenseTensorExample f1; - f1.verify_encode_decode(); + f1.verify_encode_decode(true); } //----------------------------------------------------------------------------- @@ -263,7 +268,7 @@ struct MixedTensorExample : TensorExample { TEST(ValueCodecTest, mixed_tensors_can_be_encoded_and_decoded) { MixedTensorExample f1; - f1.verify_encode_decode(); + f1.verify_encode_decode(false); } //----------------------------------------------------------------------------- diff --git a/eval/src/tests/instruction/generic_join/generic_join_test.cpp b/eval/src/tests/instruction/generic_join/generic_join_test.cpp index 53df23d77be..4821bf092da 100644 --- a/eval/src/tests/instruction/generic_join/generic_join_test.cpp +++ b/eval/src/tests/instruction/generic_join/generic_join_test.cpp @@ -41,13 +41,32 @@ std::vector<Layout> join_layouts = { float_cells({x({"a","b","c"}),y(5)}), float_cells({y(5),z({"i","j","k","l"})}) }; -TensorSpec simple_tensor_join(const TensorSpec &a, const TensorSpec &b, join_fun_t function) { - Stash stash; - const auto &engine = SimpleTensorEngine::ref(); - auto lhs = engine.from_spec(a); - auto rhs = engine.from_spec(b); - const auto &result = engine.join(*lhs, *rhs, function, stash); - return engine.to_spec(result); +bool join_address(const TensorSpec::Address &a, const TensorSpec::Address &b, TensorSpec::Address &addr) { + for (const auto &dim_a: a) { + auto pos_b = b.find(dim_a.first); + if ((pos_b != b.end()) && !(pos_b->second == dim_a.second)) { + return false; + } + addr.insert_or_assign(dim_a.first, dim_a.second); + } + return true; +} + +TensorSpec reference_join(const TensorSpec &a, const TensorSpec &b, join_fun_t function) { + ValueType res_type = ValueType::join(ValueType::from_spec(a.type()), ValueType::from_spec(b.type())); + EXPECT_FALSE(res_type.is_error()); + TensorSpec result(res_type.to_spec()); + for (const auto &cell_a: a.cells()) { + for (const auto &cell_b: b.cells()) { + TensorSpec::Address addr; + if (join_address(cell_a.first, cell_b.first, addr) && + join_address(cell_b.first, cell_a.first, addr)) + { + result.add(addr, function(cell_a.second, cell_b.second)); + } + } + } + return result; } TensorSpec perform_generic_join(const TensorSpec &a, const TensorSpec &b, join_fun_t function) { @@ -109,7 +128,7 @@ TEST(GenericJoinTest, generic_join_works_for_simple_values) { TensorSpec rhs = spec(join_layouts[i + 1], Div16(N())); for (auto fun: {operation::Add::f, operation::Sub::f, operation::Mul::f, operation::Div::f}) { SCOPED_TRACE(fmt("\n===\nLHS: %s\nRHS: %s\n===\n", lhs.to_string().c_str(), rhs.to_string().c_str())); - auto expect = simple_tensor_join(lhs, rhs, fun); + auto expect = reference_join(lhs, rhs, fun); auto actual = perform_generic_join(lhs, rhs, fun); EXPECT_EQ(actual, expect); } diff --git a/eval/src/tests/instruction/generic_rename/generic_rename_test.cpp b/eval/src/tests/instruction/generic_rename/generic_rename_test.cpp index 1139d35d847..f61899e4dda 100644 --- a/eval/src/tests/instruction/generic_rename/generic_rename_test.cpp +++ b/eval/src/tests/instruction/generic_rename/generic_rename_test.cpp @@ -87,12 +87,28 @@ TEST(GenericRenameTest, sparse_rename_plan_can_be_created) { EXPECT_EQ(plan.output_dimensions, expect); } -TensorSpec simple_tensor_rename(const TensorSpec &a, const FromTo &ft) { - Stash stash; - const auto &engine = SimpleTensorEngine::ref(); - auto lhs = engine.from_spec(a); - const auto &result = engine.rename(*lhs, ft.from, ft.to, stash); - return engine.to_spec(result); +vespalib::string rename_dimension(const vespalib::string &name, const FromTo &ft) { + assert(ft.from.size() == ft.to.size()); + for (size_t i = 0; i < ft.from.size(); ++i) { + if (name == ft.from[i]) { + return ft.to[i]; + } + } + return name; +} + +TensorSpec reference_rename(const TensorSpec &a, const FromTo &ft) { + ValueType res_type = ValueType::from_spec(a.type()).rename(ft.from, ft.to); + EXPECT_FALSE(res_type.is_error()); + TensorSpec result(res_type.to_spec()); + for (const auto &cell: a.cells()) { + TensorSpec::Address addr; + for (const auto &dim: cell.first) { + addr.insert_or_assign(rename_dimension(dim.first, ft), dim.second); + } + result.add(addr, cell.second); + } + return result; } TensorSpec perform_generic_rename(const TensorSpec &a, const ValueType &res_type, @@ -115,7 +131,7 @@ void test_generic_rename(const ValueBuilderFactory &factory) { if (renamed_type.is_error()) continue; // printf("type %s -> %s\n", lhs_type.to_spec().c_str(), renamed_type.to_spec().c_str()); SCOPED_TRACE(fmt("\n===\nLHS: %s\n===\n", lhs.to_string().c_str())); - auto expect = simple_tensor_rename(lhs, from_to); + auto expect = reference_rename(lhs, from_to); auto actual = perform_generic_rename(lhs, renamed_type, from_to, factory); EXPECT_EQ(actual, expect); } diff --git a/eval/src/tests/tensor/default_value_builder_factory/CMakeLists.txt b/eval/src/tests/tensor/default_value_builder_factory/CMakeLists.txt new file mode 100644 index 00000000000..cd7f552ec28 --- /dev/null +++ b/eval/src/tests/tensor/default_value_builder_factory/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(eval_default_value_builder_factory_test_app TEST + SOURCES + default_value_builder_factory_test.cpp + DEPENDS + vespaeval + GTest::GTest +) +vespa_add_test(NAME eval_default_value_builder_factory_test_app COMMAND eval_default_value_builder_factory_test_app ) diff --git a/eval/src/tests/tensor/default_value_builder_factory/default_value_builder_factory_test.cpp b/eval/src/tests/tensor/default_value_builder_factory/default_value_builder_factory_test.cpp new file mode 100644 index 00000000000..d180b3f6517 --- /dev/null +++ b/eval/src/tests/tensor/default_value_builder_factory/default_value_builder_factory_test.cpp @@ -0,0 +1,61 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/eval/eval/value.h> +#include <vespa/eval/eval/value_codec.h> +#include <vespa/eval/eval/tensor_spec.h> +#include <vespa/eval/tensor/default_value_builder_factory.h> +#include <vespa/eval/tensor/mixed/packed_mixed_tensor.h> +#include <vespa/eval/tensor/sparse/sparse_tensor_value.h> +#include <vespa/eval/tensor/dense/dense_tensor.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace vespalib; +using namespace vespalib::eval; +using namespace vespalib::tensor; +using namespace vespalib::eval::packed_mixed_tensor; + +Value::UP v_of(const TensorSpec &spec) { + return value_from_spec(spec, DefaultValueBuilderFactory::get()); +} + +TEST(DefaultValueBuilderFactoryTest, all_built_value_types_are_correct) { + auto dbl = v_of(TensorSpec("double").add({}, 3.0)); + auto trivial = v_of(TensorSpec("tensor(x[1])").add({{"x",0}}, 7.0)); + auto dense = v_of(TensorSpec("tensor<float>(x[2],y[3])").add({{"x",1},{"y",2}}, 17.0)); + auto sparse = v_of(TensorSpec("tensor(x{},y{})").add({{"x","foo"},{"y","bar"}}, 31.0)); + auto mixed = v_of(TensorSpec("tensor<float>(x[2],y{})").add({{"x",1},{"y","quux"}}, 42.0)); + + EXPECT_TRUE(dynamic_cast<DoubleValue *>(dbl.get())); + EXPECT_TRUE(dynamic_cast<DenseTensorView *>(trivial.get())); + EXPECT_TRUE(dynamic_cast<DenseTensorView *>(dense.get())); + EXPECT_TRUE(dynamic_cast<SparseTensorValue<double> *>(sparse.get())); + EXPECT_TRUE(dynamic_cast<PackedMixedTensor *>(mixed.get())); + + EXPECT_EQ(dbl->as_double(), 3.0); + EXPECT_EQ(trivial->cells().typify<double>()[0], 7.0); + EXPECT_EQ(dense->cells().typify<float>()[5], 17.0); + EXPECT_EQ(sparse->cells().typify<double>()[0], 31.0); + EXPECT_EQ(mixed->cells().typify<float>()[1], 42.0); + + stringref y_look = "bar"; + stringref x_res = "xxx"; + auto view = sparse->index().create_view({1}); + view->lookup({&y_look}); + size_t ss = 12345; + bool br = view->next_result({&x_res}, ss); + EXPECT_TRUE(br); + EXPECT_EQ(ss, 0); + EXPECT_EQ(x_res, "foo"); + br = view->next_result({&x_res}, ss); + EXPECT_FALSE(br); + + ss = 12345; + view = mixed->index().create_view({}); + view->lookup({}); + br = view->next_result({&x_res}, ss); + EXPECT_TRUE(br); + EXPECT_EQ(ss, 0); + EXPECT_EQ(x_res, "quux"); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp b/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp index 8bb227a7e85..ccc71fdd0f6 100644 --- a/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp +++ b/eval/src/tests/tensor/instruction_benchmark/instruction_benchmark.cpp @@ -29,7 +29,7 @@ #include <vespa/eval/eval/operation.h> #include <vespa/eval/eval/tensor_function.h> #include <vespa/eval/tensor/default_tensor_engine.h> -#include <vespa/eval/tensor/mixed/packed_mixed_tensor_builder_factory.h> +#include <vespa/eval/tensor/default_value_builder_factory.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/stash.h> @@ -94,13 +94,13 @@ struct EngineImpl : Impl { EngineImpl simple_tensor_engine_impl(" [SimpleTensorEngine]", SimpleTensorEngine::ref()); EngineImpl default_tensor_engine_impl("[DefaultTensorEngine]", DefaultTensorEngine::ref()); ValueImpl simple_value_impl(" [SimpleValue]", SimpleValueBuilderFactory::get()); -ValueImpl packed_mixed_tensor_impl(" [PackedMixedTensor]", PackedMixedTensorBuilderFactory::get()); +ValueImpl default_tensor_value_impl(" [Adaptive Value]", DefaultValueBuilderFactory::get()); double budget = 5.0; std::vector<CREF<Impl>> impl_list = {simple_tensor_engine_impl, default_tensor_engine_impl, simple_value_impl, - packed_mixed_tensor_impl}; + default_tensor_value_impl}; //----------------------------------------------------------------------------- @@ -185,44 +185,45 @@ struct D { return ValueType::Dimension(name, size); } } - std::pair<vespalib::string,TensorSpec::Label> operator()(size_t idx) const { + TensorSpec::Label operator()(size_t idx) const { if (mapped) { - return std::make_pair(name, TensorSpec::Label(fmt("label_%zu", idx))); + return TensorSpec::Label(fmt("label_%zu", idx)); } else { - return std::make_pair(name, TensorSpec::Label(idx)); + return TensorSpec::Label(idx); } } }; -TensorSpec make_vector(const D &d1, double seq) { - auto type = ValueType::tensor_type({d1}, ValueType::CellType::FLOAT); - TensorSpec spec(type.to_spec()); - for (size_t i = 0, idx1 = 0; i < d1.size; ++i, idx1 += d1.stride, seq += 1.0) { - spec.add({d1(idx1)}, seq); - } - return spec; +void add_cells(TensorSpec &spec, double &seq, TensorSpec::Address addr) { + spec.add(addr, seq); + seq += 1.0; } -TensorSpec make_cube(const D &d1, const D &d2, const D &d3, double seq) { - auto type = ValueType::tensor_type({d1, d2, d3}, ValueType::CellType::FLOAT); - TensorSpec spec(type.to_spec()); - for (size_t i = 0, idx1 = 0; i < d1.size; ++i, idx1 += d1.stride) { - for (size_t j = 0, idx2 = 0; j < d2.size; ++j, idx2 += d2.stride) { - for (size_t k = 0, idx3 = 0; k < d3.size; ++k, idx3 += d3.stride, seq += 1.0) { - spec.add({d1(idx1), d2(idx2), d3(idx3)}, seq); - } - } +template <typename ...Ds> void add_cells(TensorSpec &spec, double &seq, TensorSpec::Address addr, const D &d, const Ds &...ds) { + for (size_t i = 0, idx = 0; i < d.size; ++i, idx += d.stride) { + addr.insert_or_assign(d.name, d(idx)); + add_cells(spec, seq, addr, ds...); } +} + +template <typename ...Ds> TensorSpec make_spec(double seq, const Ds &...ds) { + TensorSpec spec(ValueType::tensor_type({ds...}, ValueType::CellType::FLOAT).to_spec()); + add_cells(spec, seq, TensorSpec::Address(), ds...); return spec; } +TensorSpec make_vector(const D &d1, double seq) { return make_spec(seq, d1); } +TensorSpec make_cube(const D &d1, const D &d2, const D &d3, double seq) { return make_spec(seq, d1, d2, d3); } + //----------------------------------------------------------------------------- TEST(MakeInputTest, print_some_test_input) { + auto number = make_spec(5.0); auto sparse = make_vector(D::map("x", 5, 3), 1.0); auto dense = make_vector(D::idx("x", 5), 10.0); auto mixed = make_cube(D::map("x", 3, 7), D::idx("y", 2), D::idx("z", 2), 100.0); fprintf(stderr, "--------------------------------------------------------\n"); + fprintf(stderr, "simple number: %s\n", number.to_string().c_str()); fprintf(stderr, "sparse vector: %s\n", sparse.to_string().c_str()); fprintf(stderr, "dense vector: %s\n", dense.to_string().c_str()); fprintf(stderr, "mixed cube: %s\n", mixed.to_string().c_str()); @@ -232,8 +233,8 @@ TEST(MakeInputTest, print_some_test_input) { //----------------------------------------------------------------------------- TEST(NumberJoin, plain_op2) { - auto lhs = TensorSpec("double").add({}, 2.0); - auto rhs = TensorSpec("double").add({}, 3.0); + auto lhs = make_spec(2.0); + auto rhs = make_spec(3.0); benchmark_join("simple numbers multiply", lhs, rhs, operation::Mul::f); } diff --git a/eval/src/vespa/eval/eval/CMakeLists.txt b/eval/src/vespa/eval/eval/CMakeLists.txt index 84cebe8cfd0..d108c516e73 100644 --- a/eval/src/vespa/eval/eval/CMakeLists.txt +++ b/eval/src/vespa/eval/eval/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(eval_eval OBJECT call_nodes.cpp compile_tensor_function.cpp delete_node.cpp + double_value_builder.cpp fast_forest.cpp function.cpp gbdt.cpp diff --git a/eval/src/vespa/eval/eval/double_value_builder.cpp b/eval/src/vespa/eval/eval/double_value_builder.cpp new file mode 100644 index 00000000000..24215104cee --- /dev/null +++ b/eval/src/vespa/eval/eval/double_value_builder.cpp @@ -0,0 +1,9 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "double_value_builder.h" + +namespace vespalib::eval { + +DoubleValueBuilder::~DoubleValueBuilder() = default; + +} diff --git a/eval/src/vespa/eval/eval/double_value_builder.h b/eval/src/vespa/eval/eval/double_value_builder.h new file mode 100644 index 00000000000..ba85d5838ad --- /dev/null +++ b/eval/src/vespa/eval/eval/double_value_builder.h @@ -0,0 +1,29 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "value.h" + +namespace vespalib::eval { + +/** + * A trivial builder for DoubleValue objects + **/ +class DoubleValueBuilder : public ValueBuilder<double> +{ +private: + double _value; +public: + DoubleValueBuilder() : _value(0.0) {} + ~DoubleValueBuilder() override; + ArrayRef<double> + add_subspace(const std::vector<vespalib::stringref> &) override { + return ArrayRef<double>(&_value, 1); + } + std::unique_ptr<Value> + build(std::unique_ptr<ValueBuilder<double>>) override { + return std::make_unique<DoubleValue>(_value); + } +}; + +} // namespace diff --git a/eval/src/vespa/eval/tensor/CMakeLists.txt b/eval/src/vespa/eval/tensor/CMakeLists.txt index bc0a4d340b8..810dfd6d0b3 100644 --- a/eval/src/vespa/eval/tensor/CMakeLists.txt +++ b/eval/src/vespa/eval/tensor/CMakeLists.txt @@ -2,6 +2,7 @@ vespa_add_library(eval_tensor OBJECT SOURCES default_tensor_engine.cpp + default_value_builder_factory.cpp tensor.cpp tensor_address.cpp tensor_apply.cpp diff --git a/eval/src/vespa/eval/tensor/default_value_builder_factory.cpp b/eval/src/vespa/eval/tensor/default_value_builder_factory.cpp new file mode 100644 index 00000000000..74fd371e9a0 --- /dev/null +++ b/eval/src/vespa/eval/tensor/default_value_builder_factory.cpp @@ -0,0 +1,57 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "default_value_builder_factory.h" +#include <vespa/vespalib/util/typify.h> +#include <vespa/eval/eval/value.h> +#include <vespa/eval/eval/double_value_builder.h> +#include <vespa/eval/tensor/dense/dense_tensor_value_builder.h> +#include <vespa/eval/tensor/mixed/packed_mixed_tensor_builder.h> +#include <vespa/eval/tensor/sparse/sparse_tensor_value_builder.h> + +using namespace vespalib::eval; + +namespace vespalib::tensor { + +//----------------------------------------------------------------------------- + +namespace { + +struct CreateDefaultValueBuilderBase { + template <typename T> static std::unique_ptr<ValueBuilderBase> invoke(const ValueType &type, + size_t num_mapped_dims, + size_t subspace_size, + size_t expected_subspaces) + { + assert(check_cell_type<T>(type.cell_type())); + if (type.is_double()) { + return std::make_unique<DoubleValueBuilder>(); + } + if (num_mapped_dims == 0) { + return std::make_unique<DenseTensorValueBuilder<T>>(type, subspace_size); + } + if (subspace_size == 1) { + return std::make_unique<SparseTensorValueBuilder<T>>(type, num_mapped_dims, expected_subspaces); + } + return std::make_unique<packed_mixed_tensor::PackedMixedTensorBuilder<T>>(type, num_mapped_dims, subspace_size, expected_subspaces); + } +}; + +} // namespace <unnamed> + +//----------------------------------------------------------------------------- + +DefaultValueBuilderFactory::DefaultValueBuilderFactory() = default; +DefaultValueBuilderFactory DefaultValueBuilderFactory::_factory; + +std::unique_ptr<ValueBuilderBase> +DefaultValueBuilderFactory::create_value_builder_base(const ValueType &type, + size_t num_mapped_dims, + size_t subspace_size, + size_t expected_subspaces) const +{ + return typify_invoke<1,TypifyCellType,CreateDefaultValueBuilderBase>(type.cell_type(), type, num_mapped_dims, subspace_size, expected_subspaces); +} + +//----------------------------------------------------------------------------- + +} diff --git a/eval/src/vespa/eval/tensor/default_value_builder_factory.h b/eval/src/vespa/eval/tensor/default_value_builder_factory.h new file mode 100644 index 00000000000..67b1391ed78 --- /dev/null +++ b/eval/src/vespa/eval/tensor/default_value_builder_factory.h @@ -0,0 +1,24 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/eval/eval/value.h> +#include <vespa/eval/eval/value_type.h> + +namespace vespalib::tensor { + +/** + * A factory that can generate ValueBuilder + * objects appropriate for the requested type. + */ +struct DefaultValueBuilderFactory : eval::ValueBuilderFactory { +private: + DefaultValueBuilderFactory(); + static DefaultValueBuilderFactory _factory; + ~DefaultValueBuilderFactory() override {} +protected: + std::unique_ptr<eval::ValueBuilderBase> create_value_builder_base(const eval::ValueType &type, + size_t num_mapped_in, size_t subspace_size_in, size_t expect_subspaces) const override; +public: + static const DefaultValueBuilderFactory &get() { return _factory; } +}; + +} // namespace diff --git a/eval/src/vespa/eval/tensor/dense/CMakeLists.txt b/eval/src/vespa/eval/tensor/dense/CMakeLists.txt index b4e849a1dde..3a41fed132e 100644 --- a/eval/src/vespa/eval/tensor/dense/CMakeLists.txt +++ b/eval/src/vespa/eval/tensor/dense/CMakeLists.txt @@ -26,6 +26,7 @@ vespa_add_library(eval_tensor_dense OBJECT dense_tensor_modify.cpp dense_tensor_peek_function.cpp dense_tensor_reduce.cpp + dense_tensor_value_builder.cpp dense_tensor_view.cpp dense_xw_product_function.cpp index_lookup_table.cpp diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_value_builder.cpp b/eval/src/vespa/eval/tensor/dense/dense_tensor_value_builder.cpp new file mode 100644 index 00000000000..6a5bb33ca06 --- /dev/null +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_value_builder.cpp @@ -0,0 +1,21 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "dense_tensor_value_builder.h" + +namespace vespalib::tensor { + +template<typename T> +DenseTensorValueBuilder<T>::DenseTensorValueBuilder(const eval::ValueType &type, + size_t subspace_size_in) + : _type(type), + _cells(subspace_size_in) +{ +} + +template<typename T> +DenseTensorValueBuilder<T>::~DenseTensorValueBuilder() = default; + +template class DenseTensorValueBuilder<float>; +template class DenseTensorValueBuilder<double>; + +} diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_value_builder.h b/eval/src/vespa/eval/tensor/dense/dense_tensor_value_builder.h new file mode 100644 index 00000000000..c420be2c582 --- /dev/null +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_value_builder.h @@ -0,0 +1,31 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "dense_tensor.h" + +namespace vespalib::tensor { + +/** + * A builder for DenseTensor objects + **/ +template<typename T> +class DenseTensorValueBuilder : public eval::ValueBuilder<T> +{ +private: + eval::ValueType _type; + std::vector<T> _cells; +public: + DenseTensorValueBuilder(const eval::ValueType &type, size_t subspace_size_in); + ~DenseTensorValueBuilder() override; + ArrayRef<T> + add_subspace(const std::vector<vespalib::stringref> &) override { + return _cells; + } + std::unique_ptr<eval::Value> + build(std::unique_ptr<eval::ValueBuilder<T>>) override { + return std::make_unique<DenseTensor<T>>(std::move(_type), std::move(_cells)); + } +}; + +} // namespace diff --git a/eval/src/vespa/eval/tensor/sparse/CMakeLists.txt b/eval/src/vespa/eval/tensor/sparse/CMakeLists.txt index a25d2abb477..91c609a59b7 100644 --- a/eval/src/vespa/eval/tensor/sparse/CMakeLists.txt +++ b/eval/src/vespa/eval/tensor/sparse/CMakeLists.txt @@ -11,4 +11,6 @@ vespa_add_library(eval_tensor_sparse OBJECT sparse_tensor_match.cpp sparse_tensor_modify.cpp sparse_tensor_remove.cpp + sparse_tensor_value.cpp + sparse_tensor_value_builder.cpp ) diff --git a/eval/src/vespa/eval/tensor/sparse/sparse_tensor_address_builder.h b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_address_builder.h index e053caf8604..32b9b57fb26 100644 --- a/eval/src/vespa/eval/tensor/sparse/sparse_tensor_address_builder.h +++ b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_address_builder.h @@ -26,9 +26,10 @@ private: protected: void append(vespalib::stringref str) { - for (size_t i(0); i < str.size() + 1; i++) { + for (size_t i(0); i < str.size(); i++) { _address.push_back_fast(str[i]); } + _address.push_back_fast('\0'); } void ensure_room(size_t additional) { if (_address.capacity() < (_address.size() + additional)) { diff --git a/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value.cpp b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value.cpp new file mode 100644 index 00000000000..62e3c786262 --- /dev/null +++ b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value.cpp @@ -0,0 +1,260 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "sparse_tensor_value.h" +#include "sparse_tensor_address_builder.h" +#include "sparse_tensor_address_decoder.h" + +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/stllike/hash_map_equal.hpp> + +#include <vespa/log/log.h> +LOG_SETUP(".eval.tensor.sparse.sparse_tensor_value"); + +namespace vespalib::tensor { + +using SubspaceMap = SparseTensorValueIndex::SubspaceMap; +using View = vespalib::eval::Value::Index::View; + +namespace { + +void copyMap(SubspaceMap &map, const SubspaceMap &map_in, Stash &to_stash) { + // copy the exact hashtable structure: + map = map_in; + // copy the actual contents of the addresses, + // and update the pointers inside the hashtable + // keys so they point to our copy: + for (auto & kv : map) { + SparseTensorAddressRef oldRef = kv.first; + SparseTensorAddressRef newRef(oldRef, to_stash); + kv.first = newRef; + } +} + +template<typename T> +size_t needed_memory_for(const SubspaceMap &map, ConstArrayRef<T> cells) { + size_t needs = cells.size() * sizeof(T); + for (const auto & kv : map) { + needs += kv.first.size(); + } + return needs; +} + +//----------------------------------------------------------------------------- + +class SparseTensorValueView : public View +{ +private: + const SubspaceMap ↦ + SubspaceMap::const_iterator iter; + const std::vector<size_t> lookup_dims; + std::vector<vespalib::stringref> lookup_refs; +public: + SparseTensorValueView(const SubspaceMap & map_in, + const std::vector<size_t> &dims) + : map(map_in), iter(map.end()), lookup_dims(dims), lookup_refs() {} + ~SparseTensorValueView(); + void lookup(const std::vector<const vespalib::stringref*> &addr) override; + bool next_result(const std::vector<vespalib::stringref*> &addr_out, size_t &idx_out) override; +}; + +SparseTensorValueView::~SparseTensorValueView() = default; + +void +SparseTensorValueView::lookup(const std::vector<const vespalib::stringref*> &addr) +{ + lookup_refs.clear(); + for (auto ptr : addr) { + lookup_refs.push_back(*ptr); + } + iter = map.begin(); + +} + +bool +SparseTensorValueView::next_result(const std::vector<vespalib::stringref*> &addr_out, size_t &idx_out) +{ + size_t total_dims = lookup_refs.size() + addr_out.size(); + while (iter != map.end()) { + const auto & ref = iter->first; + SparseTensorAddressDecoder decoder(ref); + idx_out = iter->second; + ++iter; + bool couldmatch = true; + size_t vd_idx = 0; + size_t ao_idx = 0; + for (size_t i = 0; i < total_dims; ++i) { + const auto label = decoder.decodeLabel(); + if (vd_idx < lookup_dims.size()) { + size_t next_view_dim = lookup_dims[vd_idx]; + if (i == next_view_dim) { + if (label == lookup_refs[vd_idx]) { + // match in this dimension + ++vd_idx; + continue; + } else { + // does not match + couldmatch = false; + break; + } + } + } + // not a view dimension: + *addr_out[ao_idx] = label; + ++ao_idx; + } + if (couldmatch) { + assert(vd_idx == lookup_dims.size()); + assert(ao_idx == addr_out.size()); + return true; + } + } + return false; +} + +//----------------------------------------------------------------------------- + +class SparseTensorValueLookup : public View +{ +private: + const SubspaceMap ↦ + SubspaceMap::const_iterator iter; +public: + SparseTensorValueLookup(const SubspaceMap & map_in) : map(map_in), iter(map.end()) {} + ~SparseTensorValueLookup(); + void lookup(const std::vector<const vespalib::stringref*> &addr) override; + bool next_result(const std::vector<vespalib::stringref*> &addr_out, size_t &idx_out) override; +}; + +SparseTensorValueLookup::~SparseTensorValueLookup() = default; + +void +SparseTensorValueLookup::lookup(const std::vector<const vespalib::stringref*> &addr) +{ + SparseTensorAddressBuilder builder; + for (const auto & label : addr) { + builder.add(*label); + } + auto ref = builder.getAddressRef(); + iter = map.find(ref); +} + +bool +SparseTensorValueLookup::next_result(const std::vector<vespalib::stringref*> &, size_t &idx_out) +{ + if (iter != map.end()) { + idx_out = iter->second; + iter = map.end(); + return true; + } + return false; +} + +//----------------------------------------------------------------------------- + +class SparseTensorValueAllMappings : public View +{ +private: + const SubspaceMap ↦ + SubspaceMap::const_iterator iter; +public: + SparseTensorValueAllMappings(const SubspaceMap & map_in) : map(map_in), iter(map.end()) {} + ~SparseTensorValueAllMappings(); + void lookup(const std::vector<const vespalib::stringref*> &addr) override; + bool next_result(const std::vector<vespalib::stringref*> &addr_out, size_t &idx_out) override; +}; + +SparseTensorValueAllMappings::~SparseTensorValueAllMappings() = default; + +void +SparseTensorValueAllMappings::lookup(const std::vector<const vespalib::stringref*> &) +{ + iter = map.begin(); +} + +bool +SparseTensorValueAllMappings::next_result(const std::vector<vespalib::stringref*> &addr_out, size_t &idx_out) +{ + if (iter != map.end()) { + const auto & ref = iter->first; + idx_out = iter->second; + ++iter; + SparseTensorAddressDecoder decoder(ref); + for (const auto ptr : addr_out) { + const auto label = decoder.decodeLabel(); + *ptr = label; + } + return true; + } + return false; +} + +} // namespace <unnamed> + +//----------------------------------------------------------------------------- + +SparseTensorValueIndex::SparseTensorValueIndex(size_t num_mapped_in) + : _stash(), _map(), _num_mapped_dims(num_mapped_in) {} + +SparseTensorValueIndex::SparseTensorValueIndex(const SparseTensorValueIndex & index_in) + : _stash(), _map(), _num_mapped_dims(index_in._num_mapped_dims) +{ + copyMap(_map, index_in._map, _stash); +} + +SparseTensorValueIndex::~SparseTensorValueIndex() = default; + +size_t SparseTensorValueIndex::size() const { + return _map.size(); +} + +std::unique_ptr<View> +SparseTensorValueIndex::create_view(const std::vector<size_t> &dims) const +{ + if (dims.size() == _num_mapped_dims) { + return std::make_unique<SparseTensorValueLookup>(_map); + } + if (dims.size() == 0) { + return std::make_unique<SparseTensorValueAllMappings>(_map); + } + return std::make_unique<SparseTensorValueView>(_map, dims); +} + +void +SparseTensorValueIndex::add_subspace(SparseTensorAddressRef tmp_ref, size_t idx) +{ + SparseTensorAddressRef ref(tmp_ref, _stash); + assert(_map.find(ref) == _map.end()); + assert(_map.size() == idx); + _map[ref] = idx; +} + +//----------------------------------------------------------------------------- + +template<typename T> +SparseTensorValue<T>::SparseTensorValue(const eval::ValueType &type_in, + const SparseTensorValueIndex &index_in, + const std::vector<T> &cells_in) + : _type(type_in), + _index(index_in), + _cells(cells_in) +{ +} + +template<typename T> +SparseTensorValue<T>::SparseTensorValue(eval::ValueType &&type_in, SparseTensorValueIndex &&index_in, std::vector<T> &&cells_in) + : _type(std::move(type_in)), + _index(std::move(index_in)), + _cells(std::move(cells_in)) +{ +} + +template<typename T> SparseTensorValue<T>::~SparseTensorValue() = default; + +template class SparseTensorValue<float>; +template class SparseTensorValue<double>; + +//----------------------------------------------------------------------------- + +} // namespace + +VESPALIB_HASH_MAP_INSTANTIATE(vespalib::tensor::SparseTensorAddressRef, uint32_t); diff --git a/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value.h b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value.h new file mode 100644 index 00000000000..61e412b0191 --- /dev/null +++ b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value.h @@ -0,0 +1,59 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "sparse_tensor_address_ref.h" +#include <vespa/eval/eval/value.h> +#include <vespa/eval/tensor/types.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/stash.h> + +namespace vespalib::tensor { + +struct SparseTensorValueIndex : public vespalib::eval::Value::Index +{ + using View = vespalib::eval::Value::Index::View; + using SubspaceMap = hash_map<SparseTensorAddressRef, uint32_t, hash<SparseTensorAddressRef>, + std::equal_to<>, hashtable_base::and_modulator>; + + Stash _stash; + SubspaceMap _map; + size_t _num_mapped_dims; + + explicit SparseTensorValueIndex(size_t num_mapped_dims_in); + SparseTensorValueIndex(const SparseTensorValueIndex & index_in); + SparseTensorValueIndex(SparseTensorValueIndex && index_in) = default; + ~SparseTensorValueIndex(); + size_t size() const override; + std::unique_ptr<View> create_view(const std::vector<size_t> &dims) const override; + void add_subspace(SparseTensorAddressRef tmp_ref, size_t idx); +}; + +/** + * A tensor implementation using serialized tensor addresses to + * improve CPU cache and TLB hit ratio, relative to SimpleTensor + * implementation. + */ +template<typename T> +class SparseTensorValue : public vespalib::eval::Value +{ +private: + eval::ValueType _type; + SparseTensorValueIndex _index; + std::vector<T> _cells; +public: + SparseTensorValue(const eval::ValueType &type_in, const SparseTensorValueIndex &index_in, const std::vector<T> &cells_in); + + SparseTensorValue(eval::ValueType &&type_in, SparseTensorValueIndex &&index_in, std::vector<T> &&cells_in); + + ~SparseTensorValue() override; + + TypedCells cells() const override { return TypedCells(_cells); } + + const Index &index() const override { return _index; } + + const eval::ValueType &type() const override { return _type; } +}; + +} // namespace diff --git a/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value_builder.cpp b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value_builder.cpp new file mode 100644 index 00000000000..07ba2b217ac --- /dev/null +++ b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value_builder.cpp @@ -0,0 +1,35 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "sparse_tensor_value_builder.h" + +namespace vespalib::tensor { + +template <typename T> +ArrayRef<T> +SparseTensorValueBuilder<T>::add_subspace(const std::vector<vespalib::stringref> &addr) +{ + uint32_t idx = _cells.size(); + _cells.resize(idx + 1); + _addr_builder.clear(); + for (const auto & label : addr) { + _addr_builder.add(label); + } + auto tmp_ref = _addr_builder.getAddressRef(); + _index.add_subspace(tmp_ref, idx); + return ArrayRef<T>(&_cells[idx], 1); +} + +template <typename T> +std::unique_ptr<eval::Value> +SparseTensorValueBuilder<T>::build(std::unique_ptr<eval::ValueBuilder<T>>) +{ + return std::make_unique<SparseTensorValue<T>>(std::move(_type), + std::move(_index), + std::move(_cells)); + +} + +template class SparseTensorValueBuilder<float>; +template class SparseTensorValueBuilder<double>; + +} // namespace diff --git a/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value_builder.h b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value_builder.h new file mode 100644 index 00000000000..46d79482f3d --- /dev/null +++ b/eval/src/vespa/eval/tensor/sparse/sparse_tensor_value_builder.h @@ -0,0 +1,40 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "sparse_tensor_value.h" +#include "sparse_tensor_address_builder.h" + +namespace vespalib::tensor { + +/** + * A builder for SparseTensorValue objects + * appropriate for cell type T. + **/ +template <typename T> +class SparseTensorValueBuilder : public eval::ValueBuilder<T> +{ +private: + eval::ValueType _type; + SparseTensorValueIndex _index; + std::vector<T> _cells; + SparseTensorAddressBuilder _addr_builder; +public: + SparseTensorValueBuilder(const eval::ValueType &type, + size_t num_mapped_in, + size_t expected_subspaces) + : _type(type), + _index(num_mapped_in), + _cells() + { + assert(num_mapped_in > 0); + _cells.reserve(expected_subspaces); + } + + ~SparseTensorValueBuilder() override = default; + + ArrayRef<T> add_subspace(const std::vector<vespalib::stringref> &addr) override; + std::unique_ptr<eval::Value> build(std::unique_ptr<eval::ValueBuilder<T>> self) override; +}; + +} // namespace diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index 2776c374051..291011f91d9 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -406,7 +406,7 @@ public class Flags { public static final UnboundLongFlag NODE_OBJECT_CACHE_SIZE = defineLongFlag( "node-object-cache-size", - 1000, + 2000, "The number of deserialized Node objects to store in-memory.", "Takes effect on config server restart"); diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/application/MetricConsumer.java b/jdisc_core/src/main/java/com/yahoo/jdisc/application/MetricConsumer.java index f060f1840ff..44cfb205271 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/application/MetricConsumer.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/application/MetricConsumer.java @@ -39,30 +39,31 @@ import java.util.Map; public interface MetricConsumer { /** - * <p>Consume a call to <code>Metric.set(String, Number, Metric.Context)</code>.</p> + * Consume a call to <code>Metric.set(String, Number, Metric.Context)</code>. * - * @param key The name of the metric to modify. - * @param val The value to assign to the named metric. - * @param ctx The context to further describe this entry. + * @param key the name of the metric to modify + * @param val the value to assign to the named metric + * @param ctx the context to further describe this entry */ - public void set(String key, Number val, Metric.Context ctx); + void set(String key, Number val, Metric.Context ctx); /** - * <p>Consume a call to <code>Metric.add(String, Number, Metric.Context)</code>.</p> + * Consume a call to <code>Metric.add(String, Number, Metric.Context)</code>. * - * @param key The name of the metric to modify. - * @param val The value to add to the named metric. - * @param ctx The context to further describe this entry. + * @param key the name of the metric to modify + * @param val the value to add to the named metric + * @param ctx the context to further describe this entry */ - public void add(String key, Number val, Metric.Context ctx); + void add(String key, Number val, Metric.Context ctx); /** - * <p>Creates a <code>Metric.Context</code> object that encapsulates the given properties. The returned Context object + * Creates a <code>Metric.Context</code> object that encapsulates the given properties. The returned Context object * will be passed along every future call to <code>set(String, Number, Metric.Context)</code> and - * <code>add(String, Number, Metric.Context)</code> where the properties match those given here.</p> + * <code>add(String, Number, Metric.Context)</code> where the properties match those given here. * - * @param properties The properties to incorporate in the context. - * @return The created context. + * @param properties the properties to incorporate in the context + * @return the created context */ - public Metric.Context createContext(Map<String, ?> properties); + Metric.Context createContext(Map<String, ?> properties); + } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java index ba63376d61e..d359b86ae85 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Autoscaler.java @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.autoscale; +import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ClusterResources; import com.yahoo.config.provision.ClusterSpec; import com.yahoo.vespa.hosted.provision.Node; @@ -8,7 +9,11 @@ import com.yahoo.vespa.hosted.provision.NodeRepository; import com.yahoo.vespa.hosted.provision.applications.Cluster; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -63,11 +68,13 @@ public class Autoscaler { private Optional<AllocatableClusterResources> autoscale(List<Node> clusterNodes, Limits limits, boolean exclusive) { if (unstable(clusterNodes)) return Optional.empty(); - ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); AllocatableClusterResources currentAllocation = new AllocatableClusterResources(clusterNodes, nodeRepository); - Optional<Double> cpuLoad = averageLoad(Resource.cpu, clusterNodes, clusterType); - Optional<Double> memoryLoad = averageLoad(Resource.memory, clusterNodes, clusterType); - Optional<Double> diskLoad = averageLoad(Resource.disk, clusterNodes, clusterType); + + MetricSnapshot metricSnapshot = new MetricSnapshot(clusterNodes, metricsDb, nodeRepository); + + Optional<Double> cpuLoad = metricSnapshot.averageLoad(Resource.cpu); + Optional<Double> memoryLoad = metricSnapshot.averageLoad(Resource.memory); + Optional<Double> diskLoad = metricSnapshot.averageLoad(Resource.disk); if (cpuLoad.isEmpty() || memoryLoad.isEmpty() || diskLoad.isEmpty()) return Optional.empty(); var target = ResourceTarget.idealLoad(cpuLoad.get(), memoryLoad.get(), diskLoad.get(), currentAllocation); @@ -93,23 +100,6 @@ public class Autoscaler { return Math.abs(r1 - r2) / (( r1 + r2) / 2) < threshold; } - /** - * Returns the average load of this resource in the measurement window, - * or empty if we are not in a position to make decisions from these measurements at this time. - */ - private Optional<Double> averageLoad(Resource resource, List<Node> clusterNodes, ClusterSpec.Type clusterType) { - NodeMetricsDb.Window window = metricsDb.getWindow(nodeRepository.clock().instant().minus(scalingWindow(clusterType)), - resource, - clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); - - // Require a total number of measurements scaling with the number of nodes, - // but don't require that we have at least that many from every node - if (window.measurementCount()/clusterNodes.size() < minimumMeasurementsPerNode(clusterType)) return Optional.empty(); - if (window.hostnames() != clusterNodes.size()) return Optional.empty(); - - return Optional.of(window.average()); - } - /** The duration of the window we need to consider to make a scaling decision. See also minimumMeasurementsPerNode */ static Duration scalingWindow(ClusterSpec.Type clusterType) { if (clusterType.isContent()) return Duration.ofHours(12); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Metric.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Metric.java new file mode 100644 index 00000000000..b98535f19c3 --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Metric.java @@ -0,0 +1,46 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.autoscale; + +/** + * A kind of measurement we are making for autoscaling purposes + * + * @author bratseth + */ +public enum Metric { + + cpu { // a node resource + public String fullName() { return "cpu.util"; } + float valueFromMetric(double metricValue) { return (float)metricValue / 100; } // % to ratio + }, + memory { // a node resource + public String fullName() { return "mem_total.util"; } + float valueFromMetric(double metricValue) { return (float)metricValue / 100; } // % to ratio + }, + disk { // a node resource + public String fullName() { return "disk.util"; } + float valueFromMetric(double metricValue) { return (float)metricValue / 100; } // % to ratio + }, + generation { // application config generation active on the node + public String fullName() { return "application_generation"; } + float valueFromMetric(double metricValue) { return (float)metricValue; } // Really an integer, ok up to 16M gens + }; + + /** The name of this metric as emitted from its source */ + public abstract String fullName(); + + /** Convert from the emitted value of this metric to the value we want to use here */ + abstract float valueFromMetric(double metricValue); + + public static Metric fromFullName(String name) { + for (Metric metric : values()) + if (metric.fullName().equals(name)) return metric; + throw new IllegalArgumentException("Metric '" + name + "' has no mapping"); + } + + public static Metric from(Resource resource) { + for (Metric metric : values()) + if (metric.name().equals(resource.name())) return metric; + throw new IllegalArgumentException("Resource '" + resource + "' does not map to a metric"); + } + +} diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java new file mode 100644 index 00000000000..46ba4351082 --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricSnapshot.java @@ -0,0 +1,98 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.autoscale; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.ClusterSpec; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.NodeRepository; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A snapshot which implements the questions we want to ask about metrics for one cluster at one point in time. + * + * @author bratseth + */ +public class MetricSnapshot { + + private final List<Node> clusterNodes; + private final NodeMetricsDb db; + private final NodeRepository nodeRepository; + private final Map<String, Instant> startTimePerHost; + + public MetricSnapshot(List<Node> clusterNodes, NodeMetricsDb db, NodeRepository nodeRepository) { + this.clusterNodes = clusterNodes; + this.db = db; + this.nodeRepository = nodeRepository; + this.startTimePerHost = metricStartTimes(clusterNodes, db, nodeRepository); + } + + /** + * Returns the instant of the oldest metric to consider for each node, or an empty map if metrics from the + * entire (max) window should be considered. + */ + private static Map<String, Instant> metricStartTimes(List<Node> clusterNodes, + NodeMetricsDb db, + NodeRepository nodeRepository) { + ApplicationId application = clusterNodes.get(0).allocation().get().owner(); + List<NodeMetricsDb.AutoscalingEvent> deployments = db.getEvents(application); + Map<String, Instant> startTimePerHost = new HashMap<>(); + if (!deployments.isEmpty()) { + var deployment = deployments.get(deployments.size() - 1); + List<NodeMetricsDb.NodeMeasurements> generationMeasurements = + db.getMeasurements(deployment.time(), + Metric.generation, + clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); + for (Node node : clusterNodes) { + startTimePerHost.put(node.hostname(), nodeRepository.clock().instant()); // Discard all unless we can prove otherwise + var nodeGenerationMeasurements = + generationMeasurements.stream().filter(m -> m.hostname().equals(node.hostname())).findAny(); + if (nodeGenerationMeasurements.isPresent()) { + var firstMeasurementOfCorrectGeneration = + nodeGenerationMeasurements.get().asList().stream() + .filter(m -> m.value() >= deployment.generation()) + .findFirst(); + if (firstMeasurementOfCorrectGeneration.isPresent()) { + startTimePerHost.put(node.hostname(), firstMeasurementOfCorrectGeneration.get().at()); + } + } + } + } + return startTimePerHost; + } + + /** + * Returns the average load of this resource in the measurement window, + * or empty if we are not in a position to make decisions from these measurements at this time. + */ + public Optional<Double> averageLoad(Resource resource) { + ClusterSpec.Type clusterType = clusterNodes.get(0).allocation().get().membership().cluster().type(); + + List<NodeMetricsDb.NodeMeasurements> measurements = + db.getMeasurements(nodeRepository.clock().instant().minus(Autoscaler.scalingWindow(clusterType)), + Metric.from(resource), + clusterNodes.stream().map(Node::hostname).collect(Collectors.toList())); + measurements = filterStale(measurements, startTimePerHost); + + // Require a total number of measurements scaling with the number of nodes, + // but don't require that we have at least that many from every node + int measurementCount = measurements.stream().mapToInt(m -> m.size()).sum(); + if (measurementCount / clusterNodes.size() < Autoscaler.minimumMeasurementsPerNode(clusterType)) return Optional.empty(); + if (measurements.size() != clusterNodes.size()) return Optional.empty(); + + double measurementSum = measurements.stream().flatMap(m -> m.asList().stream()).mapToDouble(m -> m.value()).sum(); + return Optional.of(measurementSum / measurementCount); + } + + private List<NodeMetricsDb.NodeMeasurements> filterStale(List<NodeMetricsDb.NodeMeasurements> measurements, + Map<String, Instant> startTimePerHost) { + if (startTimePerHost.isEmpty()) return measurements; // Map is either empty or complete + return measurements.stream().map(m -> m.copyAfter(startTimePerHost.get(m.hostname()))).collect(Collectors.toList()); + } + +} diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsResponse.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsResponse.java index 87551a5bd5f..b4195b4cdf1 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsResponse.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/MetricsResponse.java @@ -46,16 +46,16 @@ public class MetricsResponse { private void consumeNodeMetrics(String hostname, Inspector node) { long timestampSecond = node.field("timestamp").asLong(); Map<String, Double> values = consumeMetrics(node.field("metrics")); - for (Resource resource : Resource.values()) - addMetricIfPresent(hostname, resource, timestampSecond, values); + for (Metric metric : Metric.values()) + addMetricIfPresent(hostname, metric, timestampSecond, values); } - private void addMetricIfPresent(String hostname, Resource resource, long timestampSecond, Map<String, Double> values) { - if (values.containsKey(resource.metricName())) + private void addMetricIfPresent(String hostname, Metric metric, long timestampSecond, Map<String, Double> values) { + if (values.containsKey(metric.fullName())) metricValues.add(new NodeMetrics.MetricValue(hostname, - resource.metricName(), + metric.fullName(), timestampSecond, - values.get(resource.metricName()))); + values.get(metric.fullName()))); } private void consumeServiceMetrics(String hostname, Inspector node) { diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java index 316708732a7..635f3ffc081 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDb.java @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.autoscale; +import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ClusterSpec; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeRepository; @@ -9,6 +10,7 @@ import java.time.Clock; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -30,6 +32,9 @@ public class NodeMetricsDb { /** Measurements by key. Each list of measurements is sorted by increasing timestamp */ private final Map<NodeMeasurementsKey, NodeMeasurements> db = new HashMap<>(); + /** Events */ + private final List<AutoscalingEvent> events = new ArrayList<>(); + /** Lock all access for now since we modify lists inside a map */ private final Object lock = new Object(); @@ -37,120 +42,89 @@ public class NodeMetricsDb { this.nodeRepository = nodeRepository; } - /** Add measurements to this */ + /** Adds measurements to this. */ public void add(Collection<NodeMetrics.MetricValue> metricValues) { synchronized (lock) { for (var value : metricValues) { - Resource resource = Resource.fromMetric(value.name()); - NodeMeasurementsKey key = new NodeMeasurementsKey(value.hostname(), resource); + Metric metric = Metric.fromFullName(value.name()); + NodeMeasurementsKey key = new NodeMeasurementsKey(value.hostname(), metric); NodeMeasurements measurements = db.get(key); if (measurements == null) { // new node Optional<Node> node = nodeRepository.getNode(value.hostname()); if (node.isEmpty()) continue; if (node.get().allocation().isEmpty()) continue; measurements = new NodeMeasurements(value.hostname(), - resource, - node.get().allocation().get().membership().cluster().type()); + metric, + node.get().allocation().get().membership().cluster().type(), + new ArrayList<>()); db.put(key, measurements); } measurements.add(new Measurement(value.timestampSecond() * 1000, - (float)resource.valueFromMetric(value.value()))); + metric.valueFromMetric(value.value()))); } } } - /** Must be called intermittently (as long as add is called) to gc old measurements */ + /** Adds an event to this */ + public void add(AutoscalingEvent event) { + synchronized (lock) { + events.add(event); + } + } + + /** Must be called intermittently (as long as any add methods are called) to gc old data */ public void gc(Clock clock) { synchronized (lock) { // Each measurement is Object + long + float = 16 + 8 + 4 = 28 bytes // 12 hours with 1k nodes and 3 resources and 1 measurement/sec is about 5Gb - for (Iterator<NodeMeasurements> i = db.values().iterator(); i.hasNext(); ) { var measurements = i.next(); measurements.removeOlderThan(clock.instant().minus(Autoscaler.scalingWindow(measurements.type)).toEpochMilli()); if (measurements.isEmpty()) i.remove(); } - } - } - - /** Returns a window within which we can ask for specific information from this db */ - public Window getWindow(Instant startTime, Resource resource, List<String> hostnames) { - return new Window(startTime, resource, hostnames); - } - - public class Window { - - private final long startTime; - private final List<NodeMeasurementsKey> keys; - private Window(Instant startTime, Resource resource, List<String> hostnames) { - this.startTime = startTime.toEpochMilli(); - keys = hostnames.stream().map(hostname -> new NodeMeasurementsKey(hostname, resource)).collect(Collectors.toList()); + // TODO: gc events } + } - public int measurementCount() { - synchronized (lock) { - int count = 0; - for (NodeMeasurementsKey key : keys) { - NodeMeasurements measurements = db.get(key); - if (measurements == null) continue; - count += measurements.after(startTime).size(); - } - return count; - } - } - - /** Returns the count of hostnames which have measurements in this window */ - public int hostnames() { - synchronized (lock) { - int count = 0; - for (NodeMeasurementsKey key : keys) { - NodeMeasurements measurements = db.get(key); - if (measurements == null || measurements.isEmpty()) continue; - - if (measurements.get(measurements.size() - 1).timestamp >= startTime) - count++; - } - return count; + /** + * Returns a list of measurements with one entry for each of the given host names + * which have any values after startTime, in the same order + */ + public List<NodeMeasurements> getMeasurements(Instant startTime, Metric metric, List<String> hostnames) { + synchronized (lock) { + List<NodeMeasurements> measurementsList = new ArrayList<>(hostnames.size()); + for (String hostname : hostnames) { + NodeMeasurements measurements = db.get(new NodeMeasurementsKey(hostname, metric)); + if (measurements == null) continue; + measurements = measurements.copyAfter(startTime); + if (measurements.isEmpty()) continue; + measurementsList.add(measurements); } + return measurementsList; } + } - public double average() { - synchronized (lock) { - double sum = 0; - int count = 0; - for (NodeMeasurementsKey key : keys) { - NodeMeasurements measurements = db.get(key); - if (measurements == null) continue; - - int index = measurements.size() - 1; - while (index >= 0 && measurements.get(index).timestamp >= startTime) { - sum += measurements.get(index).value; - count++; - - index--; - } - } - return sum / count; - } + public List<AutoscalingEvent> getEvents(ApplicationId application) { + synchronized (lock) { + return events.stream().filter(event -> event.application().equals(application)).collect(Collectors.toList()); } - } private static class NodeMeasurementsKey { private final String hostname; - private final Resource resource; + private final Metric metric; - public NodeMeasurementsKey(String hostname, Resource resource) { + public NodeMeasurementsKey(String hostname, Metric metric) { this.hostname = hostname; - this.resource = resource; + this.metric = metric; } @Override public int hashCode() { - return Objects.hash(hostname, resource); + return Objects.hash(hostname, metric); } @Override @@ -159,65 +133,104 @@ public class NodeMetricsDb { if ( ! (o instanceof NodeMeasurementsKey)) return false; NodeMeasurementsKey other = (NodeMeasurementsKey)o; if ( ! this.hostname.equals(other.hostname)) return false; - if ( ! this.resource.equals(other.resource)) return false; + if ( ! this.metric.equals(other.metric)) return false; return true; } @Override - public String toString() { return "key to measurements of " + resource + " for " + hostname; } + public String toString() { return "key to measurements of " + metric + " for " + hostname; } } - private static class NodeMeasurements { + public static class NodeMeasurements { private final String hostname; - private final Resource resource; + private final Metric metric; private final ClusterSpec.Type type; - private final List<Measurement> measurements = new ArrayList<>(); + private final List<Measurement> measurements; - public NodeMeasurements(String hostname, Resource resource, ClusterSpec.Type type) { + // Note: This transfers ownership of the measurement list to this + private NodeMeasurements(String hostname, Metric metric, ClusterSpec.Type type, List<Measurement> measurements) { this.hostname = hostname; - this.resource = resource; + this.metric = metric; this.type = type; + this.measurements = measurements; } - void removeOlderThan(long oldestTimestamp) { - while (!measurements.isEmpty() && measurements.get(0).timestamp < oldestTimestamp) - measurements.remove(0); - } + // Public access + + public boolean isEmpty() { return measurements.isEmpty(); } - boolean isEmpty() { return measurements.isEmpty(); } + public int size() { return measurements.size(); } - int size() { return measurements.size(); } + public Measurement get(int index) { return measurements.get(index); } - Measurement get(int index) { return measurements.get(index); } + public List<Measurement> asList() { return Collections.unmodifiableList(measurements); } - void add(Measurement measurement) { measurements.add(measurement); } + public String hostname() { return hostname; } - public List<Measurement> after(long oldestTimestamp) { - return measurements.stream() - .filter(measurement -> measurement.timestamp >= oldestTimestamp) - .collect(Collectors.toList()); + public NodeMeasurements copyAfter(Instant oldestTime) { + long oldestTimestamp = oldestTime.toEpochMilli(); + return new NodeMeasurements(hostname, metric, type, + measurements.stream() + .filter(measurement -> measurement.timestamp >= oldestTimestamp) + .collect(Collectors.toList())); + } + + // Private mutation + + private void add(Measurement measurement) { measurements.add(measurement); } + + private void removeOlderThan(long oldestTimestamp) { + while (!measurements.isEmpty() && measurements.get(0).timestamp < oldestTimestamp) + measurements.remove(0); } } - private static class Measurement { + public static class Measurement { + // TODO: Order by timestamp /** The time of this measurement in epoch millis */ private final long timestamp; /** The measured value */ - private final float value; + private final double value; public Measurement(long timestamp, float value) { this.timestamp = timestamp; this.value = value; } + public double value() { return value; } + public Instant at() { return Instant.ofEpochMilli(timestamp); } + @Override public String toString() { return "measurement at " + timestamp + ": " + value; } } + public static class AutoscalingEvent { + + private final ApplicationId application; + private final long generation; + private final long timestamp; + + public AutoscalingEvent(ApplicationId application, long generation, Instant times) { + this.application = application; + this.generation = generation; + this.timestamp = times.toEpochMilli(); + } + + /** Returns the deployed application */ + public ApplicationId application() { return application; } + + /** Returns the application config generation resulting from this deployment */ + public long generation() { return generation; } + + /** Returns the time of this deployment */ + public Instant time() { return Instant.ofEpochMilli(timestamp); } + + } + } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Resource.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Resource.java index 3d5ce8881e0..7f1844efdbe 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Resource.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/Resource.java @@ -12,41 +12,31 @@ public enum Resource { /** Cpu utilization ratio */ cpu { - public String metricName() { return "cpu.util"; } double idealAverageLoad() { return 0.2; } double valueFrom(NodeResources resources) { return resources.vcpu(); } - double valueFromMetric(double metricValue) { return metricValue / 100; } // % to ratio }, /** Memory utilization ratio */ memory { - public String metricName() { return "mem_total.util"; } double idealAverageLoad() { return 0.7; } double valueFrom(NodeResources resources) { return resources.memoryGb(); } - double valueFromMetric(double metricValue) { return metricValue / 100; } // % to ratio }, /** Disk utilization ratio */ disk { - public String metricName() { return "disk.util"; } double idealAverageLoad() { return 0.6; } double valueFrom(NodeResources resources) { return resources.diskGb(); } - double valueFromMetric(double metricValue) { return metricValue / 100; } // % to ratio }; - public abstract String metricName(); - /** The load we should have of this resource on average, when one node in the cluster is down */ abstract double idealAverageLoad(); abstract double valueFrom(NodeResources resources); - abstract double valueFromMetric(double metricValue); - - public static Resource fromMetric(String metricName) { + public static Resource from(Metric metric) { for (Resource resource : values()) - if (resource.metricName().equals(metricName)) return resource; - throw new IllegalArgumentException("Metric '" + metricName + "' does not map to a resource"); + if (resource.name().equals(metric.name())) return resource; + throw new IllegalArgumentException("Metric '" + metric + "' does not map to a resource"); } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java index b3c76300173..b41b7f15499 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java @@ -83,7 +83,7 @@ public abstract class ApplicationMaintainer extends NodeRepositoryMaintainer { try (MaintenanceDeployment deployment = new MaintenanceDeployment(application, deployer, metric, nodeRepository())) { if ( ! deployment.isValid()) return false; // this will be done at another config server log.log(Level.INFO, application + " will be deployed, last deploy time " + getLastDeployTime(application)); - return deployment.activate(); + return deployment.activate().isPresent(); } finally { pendingDeployments.remove(application); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainer.java index ddee1afe21e..80077990e6a 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainer.java @@ -5,7 +5,6 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ClusterResources; import com.yahoo.config.provision.ClusterSpec; import com.yahoo.config.provision.Deployer; -import com.yahoo.config.provision.NodeResources; import com.yahoo.jdisc.Metric; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeRepository; @@ -29,6 +28,7 @@ import java.util.stream.Collectors; */ public class AutoscalingMaintainer extends NodeRepositoryMaintainer { + private final NodeMetricsDb metricsDb; private final Autoscaler autoscaler; private final Deployer deployer; private final Metric metric; @@ -40,6 +40,7 @@ public class AutoscalingMaintainer extends NodeRepositoryMaintainer { Duration interval) { super(nodeRepository, interval, metric); this.autoscaler = new Autoscaler(metricsDb, nodeRepository); + this.metricsDb = metricsDb; this.metric = metric; this.deployer = deployer; } @@ -72,7 +73,12 @@ public class AutoscalingMaintainer extends NodeRepositoryMaintainer { applications().put(application.with(cluster.get().withTarget(target)), deployment.applicationLock().get()); if (target.isPresent()) { logAutoscaling(target.get(), applicationId, clusterId, clusterNodes); - deployment.activate(); + Optional<Long> resultingGeneration = deployment.activate(); + if (resultingGeneration.isEmpty()) return; // Failed to activate + + metricsDb.add(new NodeMetricsDb.AutoscalingEvent(applicationId, + resultingGeneration.get(), + nodeRepository().clock().instant())); } } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MaintenanceDeployment.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MaintenanceDeployment.java index 4e1be9c486c..75fd16697b4 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MaintenanceDeployment.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/MaintenanceDeployment.java @@ -9,6 +9,7 @@ import com.yahoo.config.provision.TransientException; import com.yahoo.jdisc.Metric; import java.util.Objects; +import java.util.function.Supplier; import java.util.logging.Level; import com.yahoo.transaction.Mutex; import com.yahoo.vespa.hosted.provision.Node; @@ -71,33 +72,32 @@ class MaintenanceDeployment implements Closeable { } public boolean prepare() { - return doStep(() -> deployment.get().prepare()); + return doStep(() -> { deployment.get().prepare(); return 0L; }).isPresent(); } /** * Attempts to activate this deployment * - * @return whether it was successfully activated + * @return the application config generation resulting from this deployment, or empty if it was not successful */ - public boolean activate() { + public Optional<Long> activate() { return doStep(() -> deployment.get().activate()); } - private boolean doStep(Runnable action) { + private Optional<Long> doStep(Supplier<Long> step) { if (closed) throw new IllegalStateException(this + "' is closed"); - if ( ! isValid()) return false; + if ( ! isValid()) return Optional.empty(); try { - action.run(); - return true; + return Optional.of(step.get()); } catch (TransientException e) { metric.add("maintenanceDeployment.transientFailure", 1, metric.createContext(Map.of())); log.log(Level.INFO, "Failed to maintenance deploy " + application + " with a transient error: " + Exceptions.toMessageString(e)); - return false; + return Optional.empty(); } catch (RuntimeException e) { metric.add("maintenanceDeployment.failure", 1, metric.createContext(Map.of())); log.log(Level.WARNING, "Exception on maintenance deploy of " + application, e); - return false; + return Optional.empty(); } } @@ -175,7 +175,7 @@ class MaintenanceDeployment implements Closeable { if (expectedNewNode.isEmpty()) return false; if (!expectedNewNode.get().hasParent(toHost.hostname())) return false; } - if ( ! deployment.activate()) return false; + if ( deployment.activate().isEmpty()) return false; log.info(agent + " redeployed " + application + " to " + ( verifyTarget ? this : "move " + (node.hostname() + " from " + fromHost))); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java index 0dd7cfe47f0..0f351ff338d 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRepositoryMaintenance.java @@ -191,7 +191,7 @@ public class NodeRepositoryMaintenance extends AbstractComponent { // Should be equal to timeout for deployments reservationExpiry = zone.system().isCd() ? Duration.ofMinutes(5) : Duration.ofMinutes(30); scalingSuggestionsInterval = Duration.ofMinutes(31); - spareCapacityMaintenanceInterval = Duration.ofMinutes(10); + spareCapacityMaintenanceInterval = Duration.ofMinutes(30); throttlePolicy = NodeFailer.ThrottlePolicy.hosted; if (zone.environment().equals(Environment.prod) && ! zone.system().isCd()) { diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java index 5b7f90102ba..62c90ffe433 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/RetiredExpirer.java @@ -68,7 +68,7 @@ public class RetiredExpirer extends NodeRepositoryMaintainer { nodeRepository().setRemovable(application, nodesToRemove); - boolean success = deployment.activate(); + boolean success = deployment.activate().isPresent(); if ( ! success) return success; String nodeList = nodesToRemove.stream().map(Node::hostname).collect(Collectors.joining(", ")); log.info("Redeployed " + application + " to deactivate retired nodes: " + nodeList); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java index 75f3c892571..b2cb7f545ff 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/GroupPreparer.java @@ -28,6 +28,8 @@ import java.util.stream.Collectors; */ public class GroupPreparer { + private static final Mutex PROBE_LOCK = () -> {}; + private final NodeRepository nodeRepository; private final Optional<HostProvisioner> hostProvisioner; private final ListFlag<HostCapacity> preprovisionCapacityFlag; @@ -59,28 +61,26 @@ public class GroupPreparer { List<Node> surplusActiveNodes, MutableInteger highestIndex, int wantedGroups) { boolean dynamicProvisioningEnabled = nodeRepository.canProvisionHosts() && nodeRepository.zone().getCloud().dynamicProvisioning(); boolean allocateFully = dynamicProvisioningEnabled && preprovisionCapacityFlag.value().isEmpty(); - try (Mutex lock = nodeRepository.lock(application)) { - // Lock ready pool to ensure that the same nodes are not simultaneously allocated by others - try (Mutex allocationLock = nodeRepository.lockUnallocated()) { + // Try preparing in memory without lock. Most of the time there should be no changes and we can return nodes + // previously allocated. + { + MutableInteger probePrepareHighestIndex = new MutableInteger(highestIndex.get()); + NodeAllocation probeAllocation = prepareAllocation(application, cluster, requestedNodes, surplusActiveNodes, + probePrepareHighestIndex, wantedGroups, allocateFully, PROBE_LOCK); + if (probeAllocation.fulfilledAndNoChanges()) { + List<Node> acceptedNodes = probeAllocation.finalNodes(); + surplusActiveNodes.removeAll(acceptedNodes); + highestIndex.set(probePrepareHighestIndex.get()); + return acceptedNodes; + } + } - // Create a prioritized set of nodes - LockedNodeList allNodes = nodeRepository.list(allocationLock); - NodeAllocation allocation = new NodeAllocation(allNodes, application, cluster, requestedNodes, - highestIndex, nodeRepository); - - NodePrioritizer prioritizer = new NodePrioritizer(allNodes, - application, - cluster, - requestedNodes, - wantedGroups, - allocateFully, - nodeRepository); - prioritizer.addApplicationNodes(); - prioritizer.addSurplusNodes(surplusActiveNodes); - prioritizer.addReadyNodes(); - prioritizer.addNewDockerNodes(); - allocation.offer(prioritizer.prioritize()); + // There were some changes, so re-do the allocation with locks + try (Mutex lock = nodeRepository.lock(application)) { + try (Mutex allocationLock = nodeRepository.lockUnallocated()) { + NodeAllocation allocation = prepareAllocation(application, cluster, requestedNodes, surplusActiveNodes, + highestIndex, wantedGroups, allocateFully, allocationLock); if (dynamicProvisioningEnabled) { Version osVersion = nodeRepository.osVersions().targetFor(NodeType.host).orElse(Version.emptyVersion); @@ -122,4 +122,22 @@ public class GroupPreparer { } } + private NodeAllocation prepareAllocation(ApplicationId application, ClusterSpec cluster, NodeSpec requestedNodes, + List<Node> surplusActiveNodes, MutableInteger highestIndex, int wantedGroups, + boolean allocateFully, Mutex allocationLock) { + LockedNodeList allNodes = nodeRepository.list(allocationLock); + NodeAllocation allocation = new NodeAllocation(allNodes, application, cluster, requestedNodes, + highestIndex, nodeRepository); + NodePrioritizer prioritizer = new NodePrioritizer(allNodes, + application, cluster, requestedNodes, wantedGroups, allocateFully, nodeRepository); + + prioritizer.addApplicationNodes(); + prioritizer.addSurplusNodes(surplusActiveNodes); + prioritizer.addReadyNodes(); + prioritizer.addNewDockerNodes(); + allocation.offer(prioritizer.prioritize()); + + return allocation; + } + } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java index b07ce786685..0ebcb34703f 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java @@ -305,6 +305,11 @@ class NodeAllocation { return requestedNodes.fulfilledBy(accepted); } + /** Returns true this allocation was already fulfilled and resulted in no new changes */ + public boolean fulfilledAndNoChanges() { + return fulfilled() && reservableNodes().isEmpty() && newNodes().isEmpty(); + } + /** * Returns {@link FlavorCount} describing the docker node deficit for the given {@link NodeSpec}. * diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java index 226079e43fe..4056b14d4b9 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java @@ -10,6 +10,7 @@ import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeList; import com.yahoo.vespa.hosted.provision.NodeRepository; import com.yahoo.vespa.hosted.provision.node.IP; +import com.yahoo.yolean.Exceptions; import java.util.ArrayList; import java.util.EnumSet; @@ -131,7 +132,9 @@ public class NodePrioritizer { allocation = host.ipConfig().pool().findAllocation(allNodes, nodeRepository.nameResolver()); if (allocation.isEmpty()) continue; // No free addresses in this pool } catch (Exception e) { - log.log(Level.WARNING, "Failed allocating IP address on " + host.hostname(), e); + log.log(Level.WARNING, "Failed allocating IP address on " + host.hostname() + " to " + + application + ", cluster " + clusterSpec.id() + ": " + + Exceptions.toMessageString(e)); continue; } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LocksResponse.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LocksResponse.java new file mode 100644 index 00000000000..8692be2b322 --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/LocksResponse.java @@ -0,0 +1,106 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.restapi; + +import com.yahoo.container.jdisc.HttpResponse; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.JsonFormat; +import com.yahoo.slime.Slime; +import com.yahoo.vespa.curator.stats.LockAttempt; +import com.yahoo.vespa.curator.stats.LockCounters; +import com.yahoo.vespa.curator.stats.ThreadLockStats; + +import java.io.IOException; +import java.io.OutputStream; +import java.time.Instant; +import java.util.List; +import java.util.TreeMap; + +/** + * Returns information related to ZooKeeper locks. + * + * @author hakon + */ +public class LocksResponse extends HttpResponse { + + private final Slime slime = new Slime(); + + public LocksResponse() { + this(new TreeMap<>(ThreadLockStats.getLockCountersByPath()), + ThreadLockStats.getThreadLockStats(), + ThreadLockStats.getLockAttemptSamples()); + } + + /** For testing */ + LocksResponse(TreeMap<String, LockCounters> lockCountersByPath, + List<ThreadLockStats> threadLockStatsList, + List<LockAttempt> historicSamples) { + super(200); + + Cursor root = slime.setObject(); + + Cursor lockPathsCursor = root.setArray("lock-paths"); + lockCountersByPath.forEach((lockPath, lockCounters) -> { + Cursor lockPathCursor = lockPathsCursor.addObject(); + lockPathCursor.setString("path", lockPath); + lockPathCursor.setLong("in-critical-region", lockCounters.inCriticalRegionCount()); + lockPathCursor.setLong("invoke-acquire", lockCounters.invokeAcquireCount()); + lockPathCursor.setLong("acquire-failed", lockCounters.acquireFailedCount()); + lockPathCursor.setLong("acquire-timed-out", lockCounters.acquireTimedOutCount()); + lockPathCursor.setLong("lock-acquired", lockCounters.lockAcquiredCount()); + lockPathCursor.setLong("locks-released", lockCounters.locksReleasedCount()); + lockPathCursor.setLong("no-locks-errors", lockCounters.noLocksErrorCount()); + lockPathCursor.setLong("lock-release-errors", lockCounters.lockReleaseErrorCount()); + }); + + Cursor threadsCursor = root.setArray("threads"); + for (var threadLockStats : threadLockStatsList) { + List<LockAttempt> lockAttempts = threadLockStats.getLockAttempts(); + if (!lockAttempts.isEmpty()) { + Cursor threadLockStatsCursor = threadsCursor.addObject(); + threadLockStatsCursor.setString("thread-name", threadLockStats.getThreadName()); + + Cursor lockAttemptsCursor = threadLockStatsCursor.setArray("active-locks"); + for (var lockAttempt : lockAttempts) { + setLockAttempt(lockAttemptsCursor.addObject(), lockAttempt, false); + } + + threadLockStatsCursor.setString("stack-trace", threadLockStats.getStackTrace()); + } + } + + Cursor historicSamplesCursor = root.setArray("historic-samples"); + historicSamples.forEach(lockAttempt -> setLockAttempt(historicSamplesCursor.addObject(), lockAttempt, true)); + } + + @Override + public void render(OutputStream stream) throws IOException { + new JsonFormat(true).encode(stream, slime); + } + + @Override + public String getContentType() { + return "application/json"; + } + + private void setLockAttempt(Cursor lockAttemptCursor, LockAttempt lockAttempt, boolean includeThreadInfo) { + if (includeThreadInfo) { + lockAttemptCursor.setString("thread-name", lockAttempt.getThreadName()); + } + lockAttemptCursor.setString("lock-path", lockAttempt.getLockPath()); + lockAttemptCursor.setString("invoke-acquire-time", toString(lockAttempt.getTimeAcquiredWasInvoked())); + lockAttemptCursor.setString("acquire-timeout", lockAttempt.getAcquireTimeout().toString()); + lockAttempt.getTimeLockWasAcquired().ifPresent(instant -> lockAttemptCursor.setString("lock-acquired-time", toString(instant))); + lockAttemptCursor.setString("lock-state", lockAttempt.getLockState().name()); + lockAttempt.getTimeTerminalStateWasReached().ifPresent(instant -> lockAttemptCursor.setString("terminal-state-time", toString(instant))); + lockAttemptCursor.setString("acquire-duration", lockAttempt.getDurationOfAcquire().toString()); + lockAttemptCursor.setString("locked-duration", lockAttempt.getDurationWithLock().toString()); + lockAttemptCursor.setString("total-duration", lockAttempt.getDuration().toString()); + if (includeThreadInfo) { + lockAttempt.getStackTrace().ifPresent(stackTrace -> lockAttemptCursor.setString("stack-trace", stackTrace)); + } + } + + private static String toString(Instant time) { + return Instant.ofEpochMilli(time.toEpochMilli()).toString(); + } +} diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java index 1891b9d2f82..a2d599eab6e 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java @@ -116,6 +116,7 @@ public class NodesV2ApiHandler extends LoggingRequestHandler { if (pathS.startsWith("/nodes/v2/state/")) return new NodesResponse(ResponseType.nodesInStateList, request, orchestrator, nodeRepository); if (pathS.startsWith("/nodes/v2/acl/")) return new NodeAclResponse(request, nodeRepository); if (pathS.equals( "/nodes/v2/command/")) return new ResourceResponse(request.getUri(), "restart", "reboot"); + if (pathS.equals( "/nodes/v2/locks")) return new LocksResponse(); if (pathS.equals( "/nodes/v2/maintenance/")) return new JobsResponse(nodeRepository.jobControl()); if (pathS.equals( "/nodes/v2/upgrade/")) return new UpgradeResponse(nodeRepository.infrastructureVersions(), nodeRepository.osVersions(), nodeRepository.dockerImages()); if (pathS.startsWith("/nodes/v2/capacity")) return new HostCapacityResponse(nodeRepository, request); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java index 2bcbac828b8..00d1cd84e4c 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java @@ -71,6 +71,34 @@ public class AutoscalingTest { tester.autoscale(application1, cluster1.id(), min, max)); } + /** We prefer fewer nodes for container clusters as (we assume) they all use the same disk and memory */ + @Test + public void test_autoscaling_single_container_group() { + NodeResources resources = new NodeResources(3, 100, 100, 1); + ClusterResources min = new ClusterResources( 2, 1, new NodeResources(1, 1, 1, 1)); + ClusterResources max = new ClusterResources(20, 1, new NodeResources(100, 1000, 1000, 1)); + AutoscalingTester tester = new AutoscalingTester(resources); + + ApplicationId application1 = tester.applicationId("application1"); + ClusterSpec cluster1 = tester.clusterSpec(ClusterSpec.Type.container, "cluster1"); + + // deploy + tester.deploy(application1, cluster1, 5, 1, resources); + + tester.addMeasurements(Resource.cpu, 0.25f, 1f, 120, application1); + ClusterResources scaledResources = tester.assertResources("Scaling up since cpu usage is too high", + 7, 1, 2.5, 80.0, 80.0, + tester.autoscale(application1, cluster1.id(), min, max)); + + tester.deploy(application1, cluster1, scaledResources); + tester.deactivateRetired(application1, cluster1, scaledResources); + + tester.addMeasurements(Resource.cpu, 0.1f, 1f, 120, application1); + tester.assertResources("Scaling down since cpu usage has gone down", + 4, 1, 2.5, 68.6, 68.6, + tester.autoscale(application1, cluster1.id(), min, max)); + } + @Test public void autoscaling_handles_disk_setting_changes() { NodeResources hostResources = new NodeResources(3, 100, 100, 1, NodeResources.DiskSpeed.slow); @@ -100,34 +128,6 @@ public class AutoscalingTest { .allMatch(n -> n.allocation().get().requestedResources().diskSpeed() == NodeResources.DiskSpeed.any); } - /** We prefer fewer nodes for container clusters as (we assume) they all use the same disk and memory */ - @Test - public void test_autoscaling_single_container_group() { - NodeResources resources = new NodeResources(3, 100, 100, 1); - ClusterResources min = new ClusterResources( 2, 1, new NodeResources(1, 1, 1, 1)); - ClusterResources max = new ClusterResources(20, 1, new NodeResources(100, 1000, 1000, 1)); - AutoscalingTester tester = new AutoscalingTester(resources); - - ApplicationId application1 = tester.applicationId("application1"); - ClusterSpec cluster1 = tester.clusterSpec(ClusterSpec.Type.container, "cluster1"); - - // deploy - tester.deploy(application1, cluster1, 5, 1, resources); - - tester.addMeasurements(Resource.cpu, 0.25f, 1f, 120, application1); - ClusterResources scaledResources = tester.assertResources("Scaling up since cpu usage is too high", - 7, 1, 2.5, 80.0, 80.0, - tester.autoscale(application1, cluster1.id(), min, max)); - - tester.deploy(application1, cluster1, scaledResources); - tester.deactivateRetired(application1, cluster1, scaledResources); - - tester.addMeasurements(Resource.cpu, 0.1f, 1f, 120, application1); - tester.assertResources("Scaling down since cpu usage has gone down", - 4, 1, 2.5, 68.6, 68.6, - tester.autoscale(application1, cluster1.id(), min, max)); - } - @Test public void autoscaling_respects_upper_limit() { NodeResources hostResources = new NodeResources(3, 100, 100, 1); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java index cc755c01405..9e51fa37246 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java @@ -132,7 +132,7 @@ class AutoscalingTester { float effectiveValue = (r == resource ? value : (float) r.idealAverageLoad() * otherResourcesLoad) * oneExtraNodeFactor; db.add(List.of(new NodeMetrics.MetricValue(node.hostname(), - r.metricName(), + Metric.from(r).fullName(), clock().instant().toEpochMilli(), effectiveValue * 100))); // the metrics are in % } @@ -146,7 +146,7 @@ class AutoscalingTester { clock().advance(Duration.ofMinutes(1)); for (Node node : nodes) { db.add(List.of(new NodeMetrics.MetricValue(node.hostname(), - resource.metricName(), + Metric.from(resource).fullName(), clock().instant().toEpochMilli(), value * 100))); // the metrics are in % } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDbTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDbTest.java index eaad4526591..c6809fd8369 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDbTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsDbTest.java @@ -26,7 +26,7 @@ public class NodeMetricsDbTest { ProvisioningTester tester = new ProvisioningTester.Builder().build(); tester.makeReadyHosts(10, new NodeResources(10, 100, 1000, 10)) .activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); var hosts = tester.activate(app1, ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("test")).vespaVersion("7.0").build(), @@ -45,11 +45,15 @@ public class NodeMetricsDbTest { // Avoid off-by-one bug when the below windows starts exactly on one of the above getEpochSecond() timestamps. clock.advance(Duration.ofMinutes(1)); - assertEquals(35, db.getWindow(clock.instant().minus(Duration.ofHours(6)), Resource.cpu, List.of(node0)).measurementCount()); - assertEquals( 0, db.getWindow(clock.instant().minus(Duration.ofHours(6)), Resource.memory, List.of(node0)).measurementCount()); + assertEquals(35, measurementCount(db.getMeasurements(clock.instant().minus(Duration.ofHours(6)), Metric.cpu, List.of(node0)))); + assertEquals( 0, measurementCount(db.getMeasurements(clock.instant().minus(Duration.ofHours(6)), Metric.memory, List.of(node0)))); db.gc(clock); - assertEquals( 5, db.getWindow(clock.instant().minus(Duration.ofHours(6)), Resource.cpu, List.of(node0)).measurementCount()); - assertEquals( 0, db.getWindow(clock.instant().minus(Duration.ofHours(6)), Resource.memory, List.of(node0)).measurementCount()); + assertEquals( 5, measurementCount(db.getMeasurements(clock.instant().minus(Duration.ofHours(6)), Metric.cpu, List.of(node0)))); + assertEquals( 0, measurementCount(db.getMeasurements(clock.instant().minus(Duration.ofHours(6)), Metric.memory, List.of(node0)))); + } + + private int measurementCount(List<NodeMetricsDb.NodeMeasurements> measurements) { + return measurements.stream().mapToInt(m -> m.size()).sum(); } } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsFetcherTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsFetcherTest.java index fea3e8da70a..ed6fa2c37b9 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsFetcherTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/NodeMetricsFetcherTest.java @@ -30,8 +30,8 @@ public class NodeMetricsFetcherTest { tester.makeReadyNodes(4, resources); // Creates (in order) host-1.yahoo.com, host-2.yahoo.com, host-3.yahoo.com, host-4.yahoo.com tester.activateTenantHosts(); - ApplicationId application1 = tester.makeApplicationId(); - ApplicationId application2 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); + ApplicationId application2 = ProvisioningTester.makeApplicationId(); tester.deploy(application1, Capacity.from(new ClusterResources(2, 1, resources))); // host-1.yahoo.com, host-2.yahoo.com tester.deploy(application2, Capacity.from(new ClusterResources(2, 1, resources))); // host-4.yahoo.com, host-3.yahoo.com diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainerTest.java index 833daebc37a..d1a583e31c1 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainerTest.java @@ -5,27 +5,15 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Capacity; import com.yahoo.config.provision.ClusterResources; import com.yahoo.config.provision.ClusterSpec; -import com.yahoo.config.provision.Environment; -import com.yahoo.config.provision.Flavor; import com.yahoo.config.provision.NodeResources; -import com.yahoo.config.provision.NodeType; -import com.yahoo.config.provision.RegionName; -import com.yahoo.config.provision.Zone; -import com.yahoo.config.provisioning.FlavorsConfig; -import com.yahoo.vespa.hosted.provision.Node; -import com.yahoo.vespa.hosted.provision.NodeRepository; -import com.yahoo.vespa.hosted.provision.autoscale.NodeMetrics; -import com.yahoo.vespa.hosted.provision.autoscale.NodeMetricsDb; -import com.yahoo.vespa.hosted.provision.autoscale.Resource; -import com.yahoo.vespa.hosted.provision.provisioning.FlavorConfigBuilder; -import com.yahoo.vespa.hosted.provision.provisioning.ProvisioningTester; +import com.yahoo.vespa.hosted.provision.autoscale.Metric; import com.yahoo.vespa.hosted.provision.testutils.MockDeployer; import org.junit.Test; import java.time.Duration; -import java.util.List; -import java.util.Map; +import java.time.Instant; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -38,34 +26,23 @@ public class AutoscalingMaintainerTest { @Test public void testAutoscalingMaintainer() { - ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east3"))).flavorsConfig(flavorsConfig()).build(); + ApplicationId app1 = AutoscalingMaintainerTester.makeApplicationId("app1"); + ClusterSpec cluster1 = AutoscalingMaintainerTester.containerClusterSpec(); - ApplicationId app1 = tester.makeApplicationId("app1"); - ClusterSpec cluster1 = tester.containerClusterSpec(); - - ApplicationId app2 = tester.makeApplicationId("app2"); - ClusterSpec cluster2 = tester.containerClusterSpec(); + ApplicationId app2 = AutoscalingMaintainerTester.makeApplicationId("app2"); + ClusterSpec cluster2 = AutoscalingMaintainerTester.containerClusterSpec(); NodeResources lowResources = new NodeResources(4, 4, 10, 0.1); NodeResources highResources = new NodeResources(6.5, 9, 20, 0.1); - Map<ApplicationId, MockDeployer.ApplicationContext> apps = Map.of( - app1, new MockDeployer.ApplicationContext(app1, cluster1, Capacity.from(new ClusterResources(2, 1, lowResources))), - app2, new MockDeployer.ApplicationContext(app2, cluster2, Capacity.from(new ClusterResources(2, 1, highResources)))); - MockDeployer deployer = new MockDeployer(tester.provisioner(), tester.clock(), apps); + AutoscalingMaintainerTester tester = new AutoscalingMaintainerTester( + new MockDeployer.ApplicationContext(app1, cluster1, Capacity.from(new ClusterResources(2, 1, lowResources))), + new MockDeployer.ApplicationContext(app2, cluster2, Capacity.from(new ClusterResources(2, 1, highResources)))); - NodeMetricsDb nodeMetricsDb = new NodeMetricsDb(tester.nodeRepository()); - AutoscalingMaintainer maintainer = new AutoscalingMaintainer(tester.nodeRepository(), - nodeMetricsDb, - deployer, - new TestMetric(), - Duration.ofMinutes(1)); - maintainer.maintain(); // noop - assertTrue(deployer.lastDeployTime(app1).isEmpty()); - assertTrue(deployer.lastDeployTime(app2).isEmpty()); - tester.makeReadyNodes(20, "flt", NodeType.host, 8); - tester.activateTenantHosts(); + tester.maintainer().maintain(); // noop + assertTrue(tester.deployer().lastDeployTime(app1).isEmpty()); + assertTrue(tester.deployer().lastDeployTime(app2).isEmpty()); tester.deploy(app1, cluster1, Capacity.from(new ClusterResources(5, 1, new NodeResources(4, 4, 10, 0.1)), new ClusterResources(5, 1, new NodeResources(4, 4, 10, 0.1)), @@ -74,40 +51,82 @@ public class AutoscalingMaintainerTest { new ClusterResources(10, 1, new NodeResources(6.5, 9, 20, 0.1)), false, true)); - maintainer.maintain(); // noop - assertTrue(deployer.lastDeployTime(app1).isEmpty()); - assertTrue(deployer.lastDeployTime(app2).isEmpty()); - - addMeasurements(Resource.cpu, 0.9f, 500, app1, tester.nodeRepository(), nodeMetricsDb); - addMeasurements(Resource.memory, 0.9f, 500, app1, tester.nodeRepository(), nodeMetricsDb); - addMeasurements(Resource.disk, 0.9f, 500, app1, tester.nodeRepository(), nodeMetricsDb); - addMeasurements(Resource.cpu, 0.9f, 500, app2, tester.nodeRepository(), nodeMetricsDb); - addMeasurements(Resource.memory, 0.9f, 500, app2, tester.nodeRepository(), nodeMetricsDb); - addMeasurements(Resource.disk, 0.9f, 500, app2, tester.nodeRepository(), nodeMetricsDb); + tester.maintainer().maintain(); // noop + assertTrue(tester.deployer().lastDeployTime(app1).isEmpty()); + assertTrue(tester.deployer().lastDeployTime(app2).isEmpty()); - maintainer.maintain(); - assertTrue(deployer.lastDeployTime(app1).isEmpty()); // since autoscaling is off - assertTrue(deployer.lastDeployTime(app2).isPresent()); - } + tester.addMeasurements(Metric.cpu, 0.9f, 500, app1); + tester.addMeasurements(Metric.memory, 0.9f, 500, app1); + tester.addMeasurements(Metric.disk, 0.9f, 500, app1); + tester.addMeasurements(Metric.cpu, 0.9f, 500, app2); + tester.addMeasurements(Metric.memory, 0.9f, 500, app2); + tester.addMeasurements(Metric.disk, 0.9f, 500, app2); - public void addMeasurements(Resource resource, float value, int count, ApplicationId applicationId, - NodeRepository nodeRepository, NodeMetricsDb db) { - List<Node> nodes = nodeRepository.getNodes(applicationId, Node.State.active); - for (int i = 0; i < count; i++) { - for (Node node : nodes) - db.add(List.of(new NodeMetrics.MetricValue(node.hostname(), - resource.metricName(), - nodeRepository.clock().instant().toEpochMilli(), - value * 100))); // the metrics are in % - } + tester.maintainer().maintain(); + assertTrue(tester.deployer().lastDeployTime(app1).isEmpty()); // since autoscaling is off + assertTrue(tester.deployer().lastDeployTime(app2).isPresent()); } - private FlavorsConfig flavorsConfig() { - FlavorConfigBuilder b = new FlavorConfigBuilder(); - b.addFlavor("flt", 30, 30, 40, 3, Flavor.Type.BARE_METAL); - b.addFlavor("cpu", 40, 20, 40, 3, Flavor.Type.BARE_METAL); - b.addFlavor("mem", 20, 40, 40, 3, Flavor.Type.BARE_METAL); - return b.build(); + @Test + public void autoscaling_discards_metric_values_from_before_rescaling() { + ApplicationId app1 = AutoscalingMaintainerTester.makeApplicationId("app1"); + ClusterSpec cluster1 = AutoscalingMaintainerTester.containerClusterSpec(); + NodeResources lowResources = new NodeResources(4, 4, 10, 0.1); + NodeResources highResources = new NodeResources(8, 8, 20, 0.1); + Capacity app1Capacity = Capacity.from(new ClusterResources(2, 1, lowResources), + new ClusterResources(4, 2, highResources)); + var tester = new AutoscalingMaintainerTester(new MockDeployer.ApplicationContext(app1, cluster1, app1Capacity)); + + // Initial deployment at time 0 + tester.deploy(app1, cluster1, app1Capacity); + + // Measure overload + tester.clock().advance(Duration.ofSeconds(1)); + tester.addMeasurements(Metric.cpu, 0.9f, 500, app1); + tester.addMeasurements(Metric.memory, 0.9f, 500, app1); + tester.addMeasurements(Metric.disk, 0.9f, 500, app1); + + // Causes autoscaling + tester.clock().advance(Duration.ofSeconds(1)); + Instant firstMaintenanceTime = tester.clock().instant(); + tester.maintainer().maintain(); + assertTrue(tester.deployer().lastDeployTime(app1).isPresent()); + assertEquals(firstMaintenanceTime.toEpochMilli(), tester.deployer().lastDeployTime(app1).get().toEpochMilli()); + assertEquals(1, tester.nodeMetricsDb().getEvents(app1).size()); + assertEquals(app1, tester.nodeMetricsDb().getEvents(app1).get(0).application()); + assertEquals(0, tester.nodeMetricsDb().getEvents(app1).get(0).generation()); + assertEquals(firstMaintenanceTime.toEpochMilli(), tester.nodeMetricsDb().getEvents(app1).get(0).time().toEpochMilli()); + + // Measure overload still, since change is not applied, but metrics are discarded + tester.clock().advance(Duration.ofSeconds(1)); + tester.addMeasurements(Metric.cpu, 0.9f, 500, app1); + tester.addMeasurements(Metric.memory, 0.9f, 500, app1); + tester.addMeasurements(Metric.disk, 0.9f, 500, app1); + tester.clock().advance(Duration.ofSeconds(1)); + tester.maintainer().maintain(); + assertEquals(firstMaintenanceTime.toEpochMilli(), tester.deployer().lastDeployTime(app1).get().toEpochMilli()); + + // Measure underload, but no autoscaling since we haven't measured we're on the new config generation + tester.clock().advance(Duration.ofSeconds(1)); + tester.addMeasurements(Metric.cpu, 0.1f, 500, app1); + tester.addMeasurements(Metric.memory, 0.1f, 500, app1); + tester.addMeasurements(Metric.disk, 0.1f, 500, app1); + tester.clock().advance(Duration.ofSeconds(1)); + tester.maintainer().maintain(); + assertEquals(firstMaintenanceTime.toEpochMilli(), tester.deployer().lastDeployTime(app1).get().toEpochMilli()); + + // Add measurement of the expected generation, leading to rescaling + tester.clock().advance(Duration.ofSeconds(1)); + tester.addMeasurements(Metric.generation, 0, 1, app1); + tester.addMeasurements(Metric.cpu, 0.1f, 500, app1); + tester.addMeasurements(Metric.memory, 0.1f, 500, app1); + tester.addMeasurements(Metric.disk, 0.1f, 500, app1); + //tester.clock().advance(Duration.ofSeconds(1)); + Instant lastMaintenanceTime = tester.clock().instant(); + tester.maintainer().maintain(); + assertEquals(lastMaintenanceTime.toEpochMilli(), tester.deployer().lastDeployTime(app1).get().toEpochMilli()); + assertEquals(2, tester.nodeMetricsDb().getEvents(app1).size()); + assertEquals(1, tester.nodeMetricsDb().getEvents(app1).get(1).generation()); } } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainerTester.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainerTester.java new file mode 100644 index 00000000000..31af40b4377 --- /dev/null +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/AutoscalingMaintainerTester.java @@ -0,0 +1,92 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.provision.maintenance; + +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.Capacity; +import com.yahoo.config.provision.ClusterResources; +import com.yahoo.config.provision.ClusterSpec; +import com.yahoo.config.provision.Environment; +import com.yahoo.config.provision.Flavor; +import com.yahoo.config.provision.NodeResources; +import com.yahoo.config.provision.NodeType; +import com.yahoo.config.provision.RegionName; +import com.yahoo.config.provision.Zone; +import com.yahoo.config.provisioning.FlavorsConfig; +import com.yahoo.test.ManualClock; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.NodeRepository; +import com.yahoo.vespa.hosted.provision.autoscale.Metric; +import com.yahoo.vespa.hosted.provision.autoscale.NodeMetrics; +import com.yahoo.vespa.hosted.provision.autoscale.NodeMetricsDb; +import com.yahoo.vespa.hosted.provision.provisioning.FlavorConfigBuilder; +import com.yahoo.vespa.hosted.provision.provisioning.ProvisioningTester; +import com.yahoo.vespa.hosted.provision.testutils.MockDeployer; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @author bratseth + */ +public class AutoscalingMaintainerTester { + + private final ProvisioningTester provisioningTester; + private final NodeMetricsDb nodeMetricsDb; + private final AutoscalingMaintainer maintainer; + private final MockDeployer deployer; + + public AutoscalingMaintainerTester(MockDeployer.ApplicationContext ... appContexts) { + provisioningTester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east3"))) + .flavorsConfig(flavorsConfig()) + .build(); + provisioningTester.clock().setInstant(Instant.ofEpochMilli(0)); + Map<ApplicationId, MockDeployer.ApplicationContext> apps = Arrays.stream(appContexts) + .collect(Collectors.toMap(c -> c.id(), c -> c)); + deployer = new MockDeployer(provisioningTester.provisioner(), provisioningTester.clock(), apps); + nodeMetricsDb = new NodeMetricsDb(provisioningTester.nodeRepository()); + maintainer = new AutoscalingMaintainer(provisioningTester.nodeRepository(), + nodeMetricsDb, + deployer, + new TestMetric(), + Duration.ofMinutes(1)); + provisioningTester.makeReadyNodes(20, "flt", NodeType.host, 8); + provisioningTester.activateTenantHosts(); + } + + public NodeRepository nodeRepository() { return provisioningTester.nodeRepository(); } + public ManualClock clock() { return provisioningTester.clock(); } + public MockDeployer deployer() { return deployer; } + public AutoscalingMaintainer maintainer() { return maintainer; } + public NodeMetricsDb nodeMetricsDb() { return nodeMetricsDb; } + + public static ApplicationId makeApplicationId(String name) { return ProvisioningTester.makeApplicationId(name); } + public static ClusterSpec containerClusterSpec() { return ProvisioningTester.containerClusterSpec(); } + + public List<Node> deploy(ApplicationId application, ClusterSpec cluster, Capacity capacity) { + return provisioningTester.deploy(application, cluster, capacity); + } + + public void addMeasurements(Metric metric, float value, int count, ApplicationId applicationId) { + List<Node> nodes = nodeRepository().getNodes(applicationId, Node.State.active); + for (int i = 0; i < count; i++) { + for (Node node : nodes) + nodeMetricsDb.add(List.of(new NodeMetrics.MetricValue(node.hostname(), + metric.fullName(), + clock().instant().getEpochSecond(), + value * 100))); // the metrics are in % + } + } + + private FlavorsConfig flavorsConfig() { + FlavorConfigBuilder b = new FlavorConfigBuilder(); + b.addFlavor("flt", 30, 30, 40, 3, Flavor.Type.BARE_METAL); + b.addFlavor("cpu", 40, 20, 40, 3, Flavor.Type.BARE_METAL); + b.addFlavor("mem", 20, 40, 40, 3, Flavor.Type.BARE_METAL); + return b.build(); + } + +} diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainerTest.java index 9978f37e835..7cc81f218ec 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/DynamicProvisioningMaintainerTest.java @@ -182,7 +182,7 @@ public class DynamicProvisioningMaintainerTest { tester.provisioningTester.activateTenantHosts(); // Allocating nodes to a host does not result in provisioning of additional capacity - ApplicationId application = tester.provisioningTester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.provisioningTester.deploy(application, Capacity.from(new ClusterResources(2, 1, new NodeResources(4, 8, 50, 0.1)))); assertEquals(2, tester.nodeRepository.list().owner(application).size()); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/LoadBalancerExpirerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/LoadBalancerExpirerTest.java index 6c22f798fe0..4185ff2c25c 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/LoadBalancerExpirerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/LoadBalancerExpirerTest.java @@ -45,8 +45,8 @@ public class LoadBalancerExpirerTest { // Deploy two applications with a total of three load balancers ClusterSpec.Id cluster1 = ClusterSpec.Id.from("qrs"); ClusterSpec.Id cluster2 = ClusterSpec.Id.from("qrs2"); - ApplicationId app1 = tester.makeApplicationId(); - ApplicationId app2 = tester.makeApplicationId(); + ApplicationId app1 = ProvisioningTester.makeApplicationId(); + ApplicationId app2 = ProvisioningTester.makeApplicationId(); LoadBalancerId lb1 = new LoadBalancerId(app1, cluster1); LoadBalancerId lb2 = new LoadBalancerId(app2, cluster1); LoadBalancerId lb3 = new LoadBalancerId(app2, cluster2); @@ -111,7 +111,7 @@ public class LoadBalancerExpirerTest { // Prepare application ClusterSpec.Id cluster = ClusterSpec.Id.from("qrs"); - ApplicationId app = tester.makeApplicationId(); + ApplicationId app = ProvisioningTester.makeApplicationId(); LoadBalancerId lb = new LoadBalancerId(app, cluster); deployApplication(app, false, cluster); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ScalingSuggestionsMaintainerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ScalingSuggestionsMaintainerTest.java index bd2afb5d1c8..05b2d1e9ec9 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ScalingSuggestionsMaintainerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/ScalingSuggestionsMaintainerTest.java @@ -14,6 +14,7 @@ import com.yahoo.config.provision.Zone; import com.yahoo.config.provisioning.FlavorsConfig; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeRepository; +import com.yahoo.vespa.hosted.provision.autoscale.Metric; import com.yahoo.vespa.hosted.provision.autoscale.NodeMetrics; import com.yahoo.vespa.hosted.provision.autoscale.NodeMetricsDb; import com.yahoo.vespa.hosted.provision.autoscale.Resource; @@ -38,11 +39,11 @@ public class ScalingSuggestionsMaintainerTest { public void testScalingSuggestionsMaintainer() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east3"))).flavorsConfig(flavorsConfig()).build(); - ApplicationId app1 = tester.makeApplicationId("app1"); - ClusterSpec cluster1 = tester.containerClusterSpec(); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); + ClusterSpec cluster1 = ProvisioningTester.containerClusterSpec(); - ApplicationId app2 = tester.makeApplicationId("app2"); - ClusterSpec cluster2 = tester.contentClusterSpec(); + ApplicationId app2 = ProvisioningTester.makeApplicationId("app2"); + ClusterSpec cluster2 = ProvisioningTester.contentClusterSpec(); NodeMetricsDb nodeMetricsDb = new NodeMetricsDb(tester.nodeRepository()); @@ -81,7 +82,7 @@ public class ScalingSuggestionsMaintainerTest { for (int i = 0; i < count; i++) { for (Node node : nodes) db.add(List.of(new NodeMetrics.MetricValue(node.hostname(), - resource.metricName(), + Metric.from(resource).fullName(), nodeRepository.clock().instant().toEpochMilli(), value * 100))); // the metrics are in % } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/AclProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/AclProvisioningTest.java index 505e53c9195..62c6c0c9426 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/AclProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/AclProvisioningTest.java @@ -38,14 +38,14 @@ public class AclProvisioningTest { // Populate repo tester.makeReadyNodes(10, new NodeResources(1, 4, 10, 1)); List<Node> dockerHost = tester.makeReadyNodes(1, new NodeResources(1, 4, 10, 1), NodeType.host); - ApplicationId zoneApplication = tester.makeApplicationId(); + ApplicationId zoneApplication = ProvisioningTester.makeApplicationId(); tester.deploy(zoneApplication, Capacity.fromRequiredNodeType(NodeType.host)); tester.makeReadyVirtualDockerNodes(1,new NodeResources(1, 4, 10, 1), dockerHost.get(0).hostname()); List<Node> proxyNodes = tester.makeReadyNodes(3, new NodeResources(1, 4, 10, 1), NodeType.proxy); // Allocate 2 nodes - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); List<Node> activeNodes = tester.deploy(application, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 1)), false, true)); assertEquals(2, activeNodes.size()); @@ -110,7 +110,7 @@ public class AclProvisioningTest { tester.makeReadyNodes(3, "default", NodeType.proxy); // Deploy zone application - ApplicationId zoneApplication = tester.makeApplicationId(); + ApplicationId zoneApplication = ProvisioningTester.makeApplicationId(); tester.deploy(zoneApplication, Capacity.fromRequiredNodeType(NodeType.proxy)); // Get trusted nodes for first proxy node @@ -152,7 +152,7 @@ public class AclProvisioningTest { tester.makeReadyNodes(3, "default", NodeType.controller); // Allocate - ApplicationId controllerApplication = tester.makeApplicationId(); + ApplicationId controllerApplication = ProvisioningTester.makeApplicationId(); List<Node> controllers = tester.deploy(controllerApplication, Capacity.fromRequiredNodeType(NodeType.controller)); // Controllers and hosts all trust each other @@ -172,7 +172,7 @@ public class AclProvisioningTest { } // Deploy application - var application = tester.makeApplicationId(); + var application = ProvisioningTester.makeApplicationId(); List<Node> activeNodes = deploy(application, 2); assertEquals(2, activeNodes.size()); @@ -207,7 +207,7 @@ public class AclProvisioningTest { } private List<Node> deploy(int nodeCount) { - return deploy(tester.makeApplicationId(), nodeCount); + return deploy(ProvisioningTester.makeApplicationId(), nodeCount); } private List<Node> deploy(ApplicationId application, int nodeCount) { diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java index 2588818a9d3..d8ff34d1244 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java @@ -32,7 +32,7 @@ public class DockerProvisioningCompleteHostCalculatorTest { .build(); tester.makeReadyHosts(9, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); var initialResources = new NodeResources(2, 16, 50, 1); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java index 6ae78f9019c..d2a5e06469a 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java @@ -46,7 +46,7 @@ public class DockerProvisioningTest { @Test public void docker_application_deployment() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); for (int i = 1; i < 10; i++) tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost" + i); @@ -78,12 +78,12 @@ public class DockerProvisioningTest { public void refuses_to_activate_on_non_active_host() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId zoneApplication = tester.makeApplicationId(); + ApplicationId zoneApplication = ProvisioningTester.makeApplicationId(); List<Node> parents = tester.makeReadyNodes(10, new NodeResources(2, 4, 20, 2), NodeType.host, 1); for (Node parent : parents) tester.makeReadyVirtualDockerNodes(1, dockerResources, parent.hostname()); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); Version wantedVespaVersion = Version.fromString("6.39"); int nodeCount = 7; List<HostSpec> nodes = tester.prepare(application1, @@ -162,11 +162,11 @@ public class DockerProvisioningTest { for (int i = 13; i <= 16; i++) tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); prepareAndActivate(application1, 2, true, tester); assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); - ApplicationId application2 = tester.makeApplicationId(); + ApplicationId application2 = ProvisioningTester.makeApplicationId(); prepareAndActivate(application2, 2, false, tester); assertEquals("Application is assigned to separate hosts", Set.of("host3", "host4"), hostsOf(tester.getNodes(application2, Node.State.active))); @@ -185,11 +185,11 @@ public class DockerProvisioningTest { for (int i = 13; i <= 16; i++) tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); prepareAndActivate(application1, 2, false, tester); assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); - ApplicationId application2 = tester.makeApplicationId(); + ApplicationId application2 = ProvisioningTester.makeApplicationId(); prepareAndActivate(application2, 2, true, tester); assertEquals("Application is assigned to separate hosts", Set.of("host3", "host4"), hostsOf(tester.getNodes(application2, Node.State.active))); @@ -208,7 +208,7 @@ public class DockerProvisioningTest { for (int i = 13; i <= 16; i++) tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); prepareAndActivate(application1, 2, false, tester); for (Node node : tester.getNodes(application1, Node.State.active)) assertFalse(node.allocation().get().membership().cluster().isExclusive()); @@ -237,7 +237,7 @@ public class DockerProvisioningTest { for (int i = 13; i <= 16; i++) tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); prepareAndActivate(application1, 2, true, tester); assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); @@ -266,7 +266,7 @@ public class DockerProvisioningTest { @Test public void get_specified_flavor_not_default_flavor_for_docker() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.test, RegionName.from("corp-us-east-1"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost"); List<HostSpec> hosts = tester.prepare(application1, @@ -284,7 +284,7 @@ public class DockerProvisioningTest { try { ProvisioningTester tester = new ProvisioningTester.Builder() .zone(new Zone(Environment.prod, RegionName.from("us-east-1"))).build(); - ApplicationId application1 = tester.makeApplicationId("app1"); + ApplicationId application1 = ProvisioningTester.makeApplicationId("app1"); tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost1"); tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost2"); @@ -311,7 +311,7 @@ public class DockerProvisioningTest { .build(); tester.makeReadyHosts(2, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); var resources = new NodeResources(1, 8, 10, 1); @@ -331,7 +331,7 @@ public class DockerProvisioningTest { .build(); tester.makeReadyHosts(9, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); var initialResources = new NodeResources(2, 16, 50, 1); @@ -366,7 +366,7 @@ public class DockerProvisioningTest { .build(); tester.makeReadyHosts(2, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); // 5 Gb requested memory becomes 5-3=2 Gb real memory, which is an illegally small amount @@ -388,7 +388,7 @@ public class DockerProvisioningTest { .build(); tester.makeReadyHosts(5, r).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.container, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); tester.activate(app1, cluster1, Capacity.from(new ClusterResources(5, 1, r))); @@ -421,7 +421,7 @@ public class DockerProvisioningTest { .build(); tester.makeReadyHosts(4, r).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(clusterType, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); tester.activate(app1, cluster1, Capacity.from(new ClusterResources(4, 1, r))); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerAllocationTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerAllocationTest.java index 7e416bfc397..89072223341 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerAllocationTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerAllocationTest.java @@ -231,7 +231,7 @@ public class DynamicDockerAllocationTest { tester.activateTenantHosts(); //Deploy an application having 6 nodes (3 nodes in 2 groups). We only have 5 docker hosts available - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.prepare(application1, clusterSpec("myContent.t1.a1"), 6, 2, new NodeResources(1, 4, 100, 1)); fail("Two groups have been allocated to the same parent host"); @@ -247,7 +247,7 @@ public class DynamicDockerAllocationTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).flavorsConfig(flavorsConfig()).build(); // Setup test - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyNodes(5, "host-small", NodeType.host, 32); tester.activateTenantHosts(); NodeResources flavor = new NodeResources(1, 4, 100, 1); @@ -279,7 +279,7 @@ public class DynamicDockerAllocationTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.perf, RegionName.from("us-east"))).flavorsConfig(flavorsConfig()).build(); tester.makeReadyNodes(3, "host-small", NodeType.host, 32); tester.activateTenantHosts(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); List<HostSpec> hosts = tester.prepare(application1, clusterSpec("myContent.t1.a1"), 3, 1, new NodeResources(1, 4, 100, 1)); tester.activate(application1, ImmutableSet.copyOf(hosts)); @@ -292,7 +292,7 @@ public class DynamicDockerAllocationTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(SystemName.cd, Environment.test, RegionName.from("us-east"))).flavorsConfig(flavorsConfig()).build(); tester.makeReadyNodes(4, new Flavor(new NodeResources(1, 8, 120, 1, NodeResources.DiskSpeed.slow)), NodeType.host, 10, true); tester.activateTenantHosts(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); List<HostSpec> hosts = tester.prepare(application1, clusterSpec("myContent.t1.a1"), 3, 1, new NodeResources(1, 4, 100, 1)); tester.activate(application1, ImmutableSet.copyOf(hosts)); } @@ -303,7 +303,7 @@ public class DynamicDockerAllocationTest { tester.makeProvisionedNodes(3, "host-small", NodeType.host, 32).forEach(node -> tester.nodeRepository().fail(node.hostname(), Agent.system, getClass().getSimpleName())); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.prepare(application, clusterSpec("myContent.t2.a2"), 2, 1, new NodeResources(1, 40, 100, 1)); } @@ -313,7 +313,7 @@ public class DynamicDockerAllocationTest { tester.makeReadyNodes(2, "host-large", NodeType.host, 10, true); tester.activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); List<HostSpec> hosts = tester.prepare(application, clusterSpec("myContent.t1.a1"), 2, 1, new NodeResources(1, 4, 100, 1)); tester.activate(application, hosts); @@ -344,7 +344,7 @@ public class DynamicDockerAllocationTest { tester.makeReadyNodes(2, new Flavor(new NodeResources(1, 8, 120, 1, NodeResources.DiskSpeed.slow)), NodeType.host, 10, true); tester.activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); ClusterSpec cluster = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("test")).vespaVersion("1").build(); NodeResources resources = new NodeResources(1, 4, 100, 1, NodeResources.DiskSpeed.any); @@ -361,7 +361,7 @@ public class DynamicDockerAllocationTest { tester.makeReadyNodes(2, new Flavor(new NodeResources(1, 8, 120, 1, NodeResources.DiskSpeed.slow)), NodeType.host, 10, true); tester.activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); ClusterSpec cluster = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("test")).vespaVersion("1").build(); NodeResources resources = new NodeResources(1, 4, 100, 1, requestDiskSpeed); @@ -383,7 +383,7 @@ public class DynamicDockerAllocationTest { tester.makeReadyNodes(2, new Flavor(new NodeResources(1, 8, 120, 1, NodeResources.DiskSpeed.slow)), NodeType.host, 10, true); tester.activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); ClusterSpec cluster = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("test")).vespaVersion("1").build(); NodeResources resources = new NodeResources(1, 4, 100, 1, NodeResources.DiskSpeed.fast); @@ -401,7 +401,7 @@ public class DynamicDockerAllocationTest { tester.makeReadyNodes(2, new Flavor(new NodeResources(5, 20, 1400, 3)), NodeType.host, 10, true); tester.activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); ClusterSpec cluster = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("test")).vespaVersion("1").build(); List<HostSpec> hosts1 = tester.prepare(application, cluster, Capacity.from(new ClusterResources(2, 1, NodeResources.fromLegacyName("d-2-8-500")), false, true)); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java index 57c7c46c2d9..84850358798 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DynamicDockerProvisionTest.java @@ -63,7 +63,7 @@ public class DynamicDockerProvisionTest { public void dynamically_provision_with_empty_node_repo() { assertEquals(0, tester.nodeRepository().list().size()); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); NodeResources flavor = new NodeResources(1, 4, 10, 1); mockHostProvisioner(hostProvisioner, tester.nodeRepository().flavors().getFlavorOrThrow("small")); @@ -83,7 +83,7 @@ public class DynamicDockerProvisionTest { tester.makeReadyNodes(3, "small", NodeType.host, 10); tester.activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); NodeResources flavor = new NodeResources(1, 4, 10, 1); mockHostProvisioner(hostProvisioner, tester.nodeRepository().flavors().getFlavorOrThrow("small")); @@ -93,7 +93,7 @@ public class DynamicDockerProvisionTest { @Test public void allocates_to_hosts_already_hosting_nodes_by_this_tenant() { - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); NodeResources flavor = new NodeResources(1, 4, 10, 1); List<Integer> expectedProvisionIndexes = List.of(100, 101); @@ -126,7 +126,7 @@ public class DynamicDockerProvisionTest { public void node_indices_are_unique_even_when_a_node_is_left_in_reserved_state() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(zone).build(); NodeResources resources = new NodeResources(10, 10, 10, 10); - ApplicationId app = tester.makeApplicationId(); + ApplicationId app = ProvisioningTester.makeApplicationId(); Function<Node, Node> retireNode = node -> tester.patchNode(node, (n) -> n.withWantToRetire(true, Agent.system, Instant.now())); Function<Integer, Node> getNodeInGroup = group -> tester.nodeRepository().getNodes(app).stream() @@ -171,7 +171,7 @@ public class DynamicDockerProvisionTest { tester.activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); // Deploy using real memory amount (17) @@ -217,7 +217,7 @@ public class DynamicDockerProvisionTest { tester.activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); // Limits where each number is within flavor limits but but which don't contain any flavor leads to an error @@ -292,7 +292,7 @@ public class DynamicDockerProvisionTest { tester.activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); tester.activate(app1, cluster1, Capacity.from(resources(4, 2, 2, 10, 200, fast, local), @@ -327,7 +327,7 @@ public class DynamicDockerProvisionTest { tester.activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); tester.activate(app1, cluster1, Capacity.from(resources(4, 2, 2, 10, 200, fast, StorageType.any), diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/InPlaceResizeProvisionTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/InPlaceResizeProvisionTest.java index 71c3ec37d65..7a21b4eb7af 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/InPlaceResizeProvisionTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/InPlaceResizeProvisionTest.java @@ -60,8 +60,8 @@ public class InPlaceResizeProvisionTest { private final ProvisioningTester tester = new ProvisioningTester.Builder() .flagSource(flagSource) .zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - private final ApplicationId infraApp = tester.makeApplicationId(); - private final ApplicationId app = tester.makeApplicationId(); + private final ApplicationId infraApp = ProvisioningTester.makeApplicationId(); + private final ApplicationId app = ProvisioningTester.makeApplicationId(); @Test public void single_group_same_cluster_size_resource_increase() { @@ -128,7 +128,7 @@ public class InPlaceResizeProvisionTest { addParentHosts(4, new NodeResources(8, 16, 320, 8, fast, local)); // Allocate 2 nodes for one app that leaves exactly enough capacity for mediumResources left on the host - new PrepareHelper(tester, tester.makeApplicationId()).prepare(container1, 2, 1, mediumResources).activate(); + new PrepareHelper(tester, ProvisioningTester.makeApplicationId()).prepare(container1, 2, 1, mediumResources).activate(); // Allocate 4 nodes for another app. After this, 2 hosts should be completely full new PrepareHelper(tester, app).prepare(container1, 4, 1, mediumResources).activate(); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/MultigroupProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/MultigroupProvisioningTest.java index 09d1600e1d7..aba7ac2a530 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/MultigroupProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/MultigroupProvisioningTest.java @@ -42,7 +42,7 @@ public class MultigroupProvisioningTest { public void test_provisioning_of_multiple_groups() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId("app1"); + ApplicationId application1 = ProvisioningTester.makeApplicationId("app1"); tester.makeReadyNodes(31, small); @@ -75,7 +75,7 @@ public class MultigroupProvisioningTest { public void test_provisioning_of_groups_with_asymmetry() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyNodes(21, large); @@ -88,7 +88,7 @@ public class MultigroupProvisioningTest { public void test_provisioning_of_multiple_groups_after_flavor_migration() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId("app1"); + ApplicationId application1 = ProvisioningTester.makeApplicationId("app1"); tester.makeReadyNodes(10, small); tester.makeReadyNodes(16, large); @@ -102,7 +102,7 @@ public class MultigroupProvisioningTest { public void test_one_node_and_group_to_two() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.perf, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyNodes(10, small); @@ -114,7 +114,7 @@ public class MultigroupProvisioningTest { public void test_one_node_and_group_to_two_with_resource_change() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.perf, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyNodes(10, small); tester.makeReadyNodes(10, large); @@ -136,7 +136,7 @@ public class MultigroupProvisioningTest { .build(); tester.makeReadyHosts(6, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); // Deploy with 1 group @@ -162,7 +162,7 @@ public class MultigroupProvisioningTest { .build(); tester.makeReadyHosts(6, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); // Deploy with 3 groups @@ -192,7 +192,7 @@ public class MultigroupProvisioningTest { .build(); tester.makeReadyHosts(12, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); // Deploy with 3 groups @@ -220,7 +220,7 @@ public class MultigroupProvisioningTest { public void test_provisioning_of_multiple_groups_after_flavor_migration_and_exiration() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId("app1"); + ApplicationId application1 = ProvisioningTester.makeApplicationId("app1"); tester.makeReadyNodes(10, small); tester.makeReadyNodes(16, large); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java index 11e7af512c3..d9265153596 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java @@ -33,7 +33,7 @@ public class NodeTypeProvisioningTest { private final ProvisioningTester tester = new ProvisioningTester.Builder().build(); - private final ApplicationId application = tester.makeApplicationId(); // application using proxy nodes + private final ApplicationId application = ProvisioningTester.makeApplicationId(); // application using proxy nodes private final Capacity capacity = Capacity.fromRequiredNodeType(NodeType.proxy); private final ClusterSpec clusterSpec = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("test")).vespaVersion("6.42").build(); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java index 6cf5a2c8342..1c03bbc6a56 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java @@ -59,8 +59,8 @@ public class ProvisioningTest { public void application_deployment_constant_application_size() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); - ApplicationId application2 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); + ApplicationId application2 = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(21, defaultResources).activateTenantHosts(); @@ -141,10 +141,10 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); tester.makeReadyHosts(4, defaultResources); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); // deploy - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); SystemState state1 = prepare(application1, 1, 1, 1, 1, defaultResources, tester); tester.activate(application1, state1.allHosts); @@ -166,10 +166,10 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); tester.makeReadyHosts(4, defaultResources); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); // deploy - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); SystemState state1 = prepare(application1, tester, 1, 1, 1, 1, defaultResources, "1.2.3"); String dockerImageRepo = "docker.domain.tld/my/image"; prepare(application1, tester, 1, 1, 1 , 1 , false, defaultResources, "1.2.3", Optional.of(dockerImageRepo)); @@ -193,7 +193,7 @@ public class ProvisioningTest { public void application_deployment_variable_application_size() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(24, defaultResources); tester.activateTenantHosts(); @@ -264,7 +264,7 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(12, small); tester.activateTenantHosts(); @@ -299,7 +299,7 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(12, small); tester.makeReadyHosts(12, large); @@ -317,7 +317,7 @@ public class ProvisioningTest { public void application_deployment_above_then_at_capacity_limit() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(5, defaultResources).activateTenantHosts(); @@ -343,9 +343,9 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); tester.makeReadyHosts(4, defaultResources); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); SystemState state = prepare(application, 2, 2, 3, 3, defaultResources, tester); assertEquals(4, state.allHosts.size()); tester.activate(application, state.allHosts); @@ -356,8 +356,8 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); tester.makeReadyHosts(13, defaultResources).activateTenantHosts(); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); - ApplicationId application = tester.makeApplicationId(); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); + ApplicationId application = ProvisioningTester.makeApplicationId(); { // Deploy with disk-speed any and make sure that information is retained @@ -399,9 +399,9 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); tester.makeReadyHosts(4, defaultResources).activateTenantHosts(); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); SystemState state = prepare(application, tester, 2, 2, 3, 3, defaultResources, "6.91"); assertEquals(4, state.allHosts.size()); tester.activate(application, state.allHosts); @@ -412,9 +412,9 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); tester.makeReadyHosts(4, defaultResources).activateTenantHosts(); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); String dockerImageRepo = "docker.domain.tld/my/image"; SystemState state = prepare(application, tester, 2, 2, 3, 3, false, defaultResources, "6.91", Optional.of(dockerImageRepo)); assertEquals(4, state.allHosts.size()); @@ -425,8 +425,9 @@ public class ProvisioningTest { public void test_deployment_size() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.test, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(4, defaultResources).activateTenantHosts(); + SystemState state = prepare(application, 2, 2, 3, 3, defaultResources, tester); assertEquals(4, state.allHosts.size()); tester.activate(application, state.allHosts); @@ -440,7 +441,7 @@ public class ProvisioningTest { .build(); tester.makeReadyHosts(4, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); tester.activate(app1, cluster1, Capacity.from(new ClusterResources(2, 1, NodeResources.unspecified()), @@ -458,7 +459,7 @@ public class ProvisioningTest { .build(); tester.makeReadyHosts(31, hostFlavor.resources()).activateTenantHosts(); - ApplicationId app1 = tester.makeApplicationId("app1"); + ApplicationId app1 = ProvisioningTester.makeApplicationId("app1"); ClusterSpec cluster1 = ClusterSpec.request(ClusterSpec.Type.content, new ClusterSpec.Id("cluster1")).vespaVersion("7").build(); // Initial deployment @@ -515,7 +516,7 @@ public class ProvisioningTest { public void prod_deployment_requires_redundancy() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(10, defaultResources).activateTenantHosts(); prepare(application, 1, 2, 3, 3, defaultResources, tester); } @@ -524,7 +525,7 @@ public class ProvisioningTest { public void below_memory_resource_limit() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(10, defaultResources).activateTenantHosts(); try { prepare(application, 2, 2, 3, 3, @@ -539,7 +540,7 @@ public class ProvisioningTest { public void below_vcpu_resource_limit() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(10, defaultResources).activateTenantHosts(); try { prepare(application, 2, 2, 3, 3, @@ -556,9 +557,9 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); tester.makeReadyHosts(4, new NodeResources(2, 4, 10, 2)); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); SystemState state = prepare(application, 2, 2, 3, 3, new NodeResources(2, 4, 10, 2), tester); assertEquals(4, state.allHosts.size()); @@ -572,7 +573,7 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.test, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(4, large).activateTenantHosts(); SystemState state = prepare(application, 2, 2, 3, 3, large, tester); assertEquals(4, state.allHosts.size()); @@ -583,7 +584,7 @@ public class ProvisioningTest { public void staging_deployment_size() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.staging, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(14, defaultResources).activateTenantHosts(); SystemState state = prepare(application, 1, 1, 1, 64, defaultResources, tester); // becomes 1, 1, 1, 1, 6 assertEquals(9, state.allHosts.size()); @@ -595,7 +596,7 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); tester.makeReadyHosts(10, defaultResources).activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); SystemState state = prepare(application, 2, 2, 3, 3, defaultResources, tester); // Simulate expiry @@ -617,7 +618,7 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); tester.makeReadyHosts(9, defaultResources).activateTenantHosts(); // need 2+2+3+3=10 - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); try { prepare(application, 2, 2, 3, 3, defaultResources, tester); fail("Expected exception"); @@ -634,7 +635,7 @@ public class ProvisioningTest { RegionName.from("us-east"))).build(); tester.makeReadyHosts(13, defaultResources).activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); try { prepare(application, 2, 2, 6, 3, defaultResources, tester); fail("Expected exception"); @@ -652,7 +653,7 @@ public class ProvisioningTest { RegionName.from("us-east"))).build(); tester.makeReadyHosts(13, defaultResources).activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); prepare(application, 2, 2, 6, 3, defaultResources, tester); } @@ -660,7 +661,7 @@ public class ProvisioningTest { public void out_of_capacity_but_cannot_fail() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); tester.makeReadyHosts(4, defaultResources).activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); ClusterSpec cluster = ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("music")).vespaVersion("4.5.6").build(); tester.prepare(application, cluster, Capacity.from(new ClusterResources(5, 1, NodeResources.unspecified()), false, false)); // No exception; Success @@ -670,7 +671,7 @@ public class ProvisioningTest { public void out_of_capacity_all_nodes_want_to_retire() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); // Flag all nodes for retirement List<Node> readyNodes = tester.makeReadyNodes(5, defaultResources); tester.patchNodes(readyNodes, (node) -> node.withWantToRetire(true, Agent.system, tester.clock().instant())); @@ -690,7 +691,7 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); // Create 10 nodes tester.makeReadyHosts(10, defaultResources).activateTenantHosts(); @@ -722,7 +723,7 @@ public class ProvisioningTest { public void highest_node_indexes_are_retired_first() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = tester.makeApplicationId(); + ApplicationId application1 = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(14, defaultResources).activateTenantHosts(); @@ -753,7 +754,7 @@ public class ProvisioningTest { .spareCount(1).build(); tester.makeReadyHosts(7, defaultResources).activateTenantHosts(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); ClusterSpec spec = ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("content1")).vespaVersion("7.1.2").build(); tester.deploy(application, spec, Capacity.from(new ClusterResources(6, 1, defaultResources))); @@ -773,7 +774,7 @@ public class ProvisioningTest { public void application_deployment_retires_nodes_that_want_to_retire() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(10, defaultResources).activateTenantHosts(); // Deploy application @@ -801,7 +802,7 @@ public class ProvisioningTest { public void application_deployment_extends_existing_reservations_on_deploy() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(2, defaultResources).activateTenantHosts(); // Deploy fails with out of capacity @@ -841,7 +842,7 @@ public class ProvisioningTest { @Test public void required_capacity_respects_prod_redundancy_requirement() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); try { prepare(application, tester, 1, 0, 1, 0, true, defaultResources, "6.42", Optional.empty()); fail("Expected exception"); @@ -853,9 +854,9 @@ public class ProvisioningTest { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(SystemName.dev, Environment.dev, RegionName.from("no-central"))).build(); tester.makeReadyNodes(4, defaultResources, NodeType.devhost, 1); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.devhost); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.devhost); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); SystemState state = prepare(application, 2, 2, 3, 3, defaultResources, tester); assertEquals(4, state.allHosts.size()); tester.activate(application, state.allHosts); @@ -864,7 +865,7 @@ public class ProvisioningTest { @Test public void cluster_spec_update_for_already_reserved_nodes() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); - ApplicationId application = tester.makeApplicationId(); + ApplicationId application = ProvisioningTester.makeApplicationId(); String version1 = "6.42"; String version2 = "6.43"; tester.makeReadyNodes(2, defaultResources); @@ -881,7 +882,7 @@ public class ProvisioningTest { @Test public void change_to_and_from_combined_cluster_does_not_change_node_allocation() { var tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - var application = tester.makeApplicationId(); + var application = ProvisioningTester.makeApplicationId(); tester.makeReadyHosts(4, defaultResources).activateTenantHosts(); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java index 4c8d5caad43..078b1d372df 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java @@ -321,14 +321,14 @@ public class ProvisioningTester { return removed; } - public ApplicationId makeApplicationId() { + public static ApplicationId makeApplicationId() { return ApplicationId.from( TenantName.from(UUID.randomUUID().toString()), ApplicationName.from(UUID.randomUUID().toString()), InstanceName.from(UUID.randomUUID().toString())); } - public ApplicationId makeApplicationId(String applicationName) { + public static ApplicationId makeApplicationId(String applicationName) { return ApplicationId.from("tenant", applicationName, "default"); } @@ -518,11 +518,11 @@ public class ProvisioningTester { activate(applicationId, Set.copyOf(list)); } - public ClusterSpec containerClusterSpec() { + public static ClusterSpec containerClusterSpec() { return ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("test")).vespaVersion("6.42").build(); } - public ClusterSpec contentClusterSpec() { + public static ClusterSpec contentClusterSpec() { return ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("test")).vespaVersion("6.42").build(); } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java index 0ef7071b095..b5e31e7cbdb 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java @@ -39,7 +39,7 @@ public class VirtualNodeProvisioningTest { private static final ClusterSpec containerClusterSpec = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("myContainer")).vespaVersion("6.42").build(); private ProvisioningTester tester = new ProvisioningTester.Builder().build(); - private ApplicationId applicationId = tester.makeApplicationId(); + private ApplicationId applicationId = ProvisioningTester.makeApplicationId(); @Test public void distinct_parent_host_for_each_node_in_a_cluster() { @@ -84,7 +84,7 @@ public class VirtualNodeProvisioningTest { NodeResources flavor = new NodeResources(1, 4, 10, 1); tester = new ProvisioningTester.Builder().zone(new Zone(Environment.dev, RegionName.from("us-east"))).build(); tester.makeReadyNodes(4, flavor, NodeType.host, 1); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups, flavor); List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups, flavor); @@ -98,7 +98,7 @@ public class VirtualNodeProvisioningTest { { tester = new ProvisioningTester.Builder().zone(new Zone(SystemName.cd, Environment.prod, RegionName.from("us-east"))).build(); tester.makeReadyNodes(4, flavor, NodeType.host, 1); - tester.prepareAndActivateInfraApplication(tester.makeApplicationId(), NodeType.host); + tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups); List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups); diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index 4ced3fe173b..897073397ef 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -5,6 +5,7 @@ vespa_add_executable(searchcore_vespa_feed_bm_app spi_bm_feed_handler.cpp storage_api_rpc_bm_feed_handler.cpp storage_api_chain_bm_feed_handler.cpp + storage_reply_error_checker.cpp OUTPUT_NAME vespa-feed-bm DEPENDS searchcore_server diff --git a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h index a3341bf14c9..81a08552c0c 100644 --- a/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/i_bm_feed_handler.h @@ -25,6 +25,7 @@ public: virtual void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) = 0; virtual void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) = 0; virtual void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) = 0; + virtual uint32_t get_error_count() const = 0; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp index d53ece2fc42..2df97841bb9 100644 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.cpp @@ -24,16 +24,18 @@ storage::spi::Context context(default_load_type, storage::spi::Priority(0), stor class MyOperationComplete : public storage::spi::OperationComplete { + std::atomic<uint32_t> &_errors; PendingTracker& _tracker; public: - MyOperationComplete(PendingTracker& tracker); + MyOperationComplete(std::atomic<uint32_t> &errors, PendingTracker& tracker); ~MyOperationComplete(); void onComplete(std::unique_ptr<storage::spi::Result> result) override; void addResultHandler(const storage::spi::ResultHandler* resultHandler) override; }; -MyOperationComplete::MyOperationComplete(PendingTracker& tracker) - : _tracker(tracker) +MyOperationComplete::MyOperationComplete(std::atomic<uint32_t> &errors, PendingTracker& tracker) + : _errors(errors), + _tracker(tracker) { _tracker.retain(); } @@ -46,7 +48,9 @@ MyOperationComplete::~MyOperationComplete() void MyOperationComplete::onComplete(std::unique_ptr<storage::spi::Result> result) { - (void) result; + if (result->hasError()) { + ++_errors; + } } void @@ -59,7 +63,8 @@ MyOperationComplete::addResultHandler(const storage::spi::ResultHandler * result SpiBmFeedHandler::SpiBmFeedHandler(PersistenceProvider& provider) : IBmFeedHandler(), - _provider(provider) + _provider(provider), + _errors(0u) { } @@ -68,19 +73,19 @@ SpiBmFeedHandler::~SpiBmFeedHandler() = default; void SpiBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) { - _provider.putAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(tracker)); + _provider.putAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document), context, std::make_unique<MyOperationComplete>(_errors, tracker)); } void SpiBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) { - _provider.updateAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(tracker)); + _provider.updateAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), std::move(document_update), context, std::make_unique<MyOperationComplete>(_errors, tracker)); } void SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) { - _provider.removeAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(tracker)); + _provider.removeAsync(Bucket(bucket, PartitionId(0)), Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(_errors, tracker)); } @@ -90,4 +95,9 @@ SpiBmFeedHandler::create_bucket(const document::Bucket& bucket) _provider.createBucket(Bucket(bucket, PartitionId(0)), context); } +uint32_t +SpiBmFeedHandler::get_error_count() const +{ + return _errors; +} } diff --git a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h index 5b56a4f21dd..bda41898510 100644 --- a/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/spi_bm_feed_handler.h @@ -3,6 +3,7 @@ #pragma once #include "i_bm_feed_handler.h" +#include <atomic> namespace storage::spi { struct PersistenceProvider; } @@ -14,6 +15,7 @@ namespace feedbm { class SpiBmFeedHandler : public IBmFeedHandler { storage::spi::PersistenceProvider& _provider; + std::atomic<uint32_t> _errors; public: SpiBmFeedHandler(storage::spi::PersistenceProvider& provider); ~SpiBmFeedHandler(); @@ -21,6 +23,7 @@ public: void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; void create_bucket(const document::Bucket& bucket); + uint32_t get_error_count() const override; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp index 6f1acd10fe4..ba16f226ebb 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp @@ -2,6 +2,7 @@ #include "storage_api_chain_bm_feed_handler.h" #include "pending_tracker.h" +#include "storage_reply_error_checker.h" #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> #include <vespa/storageapi/messageapi/storagemessage.h> @@ -33,7 +34,8 @@ std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() { } -class BmStorageLink : public StorageLink +class BmStorageLink : public StorageLink, + public StorageReplyErrorChecker { std::mutex _mutex; vespalib::hash_map<uint64_t, PendingTracker *> _pending; @@ -65,6 +67,7 @@ public: BmStorageLink::BmStorageLink() : storage::StorageLink("vespa-bm-feed"), + StorageReplyErrorChecker(), _mutex(), _pending() { @@ -86,6 +89,7 @@ BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) bool BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) { + check_error(*msg); return release(msg->getMsgId()); } @@ -182,4 +186,10 @@ StorageApiChainBmFeedHandler::get_storage_chain_builder(std::shared_ptr<Context> return std::make_unique<MyStorageChainBuilder>(std::move(context)); } +uint32_t +StorageApiChainBmFeedHandler::get_error_count() const +{ + return _context->bm_link->get_error_count(); +} + } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h index 521deddd19e..c3fdab6cd76 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h @@ -29,6 +29,7 @@ public: static std::shared_ptr<Context> get_context(); static std::unique_ptr<storage::IStorageChainBuilder> get_storage_chain_builder(std::shared_ptr<Context> context); + uint32_t get_error_count() const override; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp index c8d73444652..3a974bb7d9a 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp @@ -2,6 +2,7 @@ #include "storage_api_rpc_bm_feed_handler.h" #include "pending_tracker.h" +#include "storage_reply_error_checker.h" #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> #include <vespa/documentapi/loadtypes/loadtypeset.h> @@ -58,22 +59,26 @@ set_cluster_up(SharedRpcResources &shared_rpc_resources, storage::api::StorageMe } -class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher +class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher, + public StorageReplyErrorChecker { std::mutex _mutex; vespalib::hash_map<uint64_t, PendingTracker *> _pending; public: MyMessageDispatcher() : storage::MessageDispatcher(), + StorageReplyErrorChecker(), _mutex(), _pending() { } ~MyMessageDispatcher() override; void dispatch_sync(std::shared_ptr<storage::api::StorageMessage> msg) override { + check_error(*msg); release(msg->getMsgId()); } void dispatch_async(std::shared_ptr<storage::api::StorageMessage> msg) override { + check_error(*msg); release(msg->getMsgId()); } void retain(uint64_t msg_id, PendingTracker &tracker) { @@ -143,4 +148,10 @@ StorageApiRpcBmFeedHandler::remove(const document::Bucket& bucket, const Documen send_rpc(std::move(cmd), tracker); } +uint32_t +StorageApiRpcBmFeedHandler::get_error_count() const +{ + return _message_dispatcher->get_error_count(); +} + } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h index 0fe92350eb2..bc2f62e038f 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h @@ -38,6 +38,7 @@ public: void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + uint32_t get_error_count() const override; }; } diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp new file mode 100644 index 00000000000..260b0c8a7af --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.cpp @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_reply_error_checker.h" +#include <vespa/storageapi/messageapi/storagereply.h> + +#include <vespa/log/log.h> +LOG_SETUP(".storage_reply_error_checker"); + +namespace feedbm { + +StorageReplyErrorChecker::StorageReplyErrorChecker() + : _errors(0u) +{ +} + +StorageReplyErrorChecker::~StorageReplyErrorChecker() = default; + +void +StorageReplyErrorChecker::check_error(const storage::api::StorageMessage &msg) +{ + auto reply = dynamic_cast<const storage::api::StorageReply*>(&msg); + if (reply != nullptr) { + if (reply->getResult().failed()) { + if (++_errors <= 10) { + LOG(info, "reply '%s', return code '%s'", reply->toString().c_str(), reply->getResult().toString().c_str()); + } + } + } else { + ++_errors; + } +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h new file mode 100644 index 00000000000..78004f3d787 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <atomic> + +namespace storage::api { class StorageMessage; } + +namespace feedbm { + +class StorageReplyErrorChecker { + std::atomic<uint32_t> _errors; +public: + StorageReplyErrorChecker(); + ~StorageReplyErrorChecker(); + void check_error(const storage::api::StorageMessage &msg); + uint32_t get_error_count() const { return _errors; } +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 73d741c3fee..5b9f7a58293 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -62,6 +62,9 @@ #include <vespa/messagebus/config-messagebus.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> +#include <vespa/storageserver/app/distributorprocess.h> +#include <vespa/storage/config/config-stor-distributormanager.h> +#include <vespa/storage/config/config-stor-visitordispatcher.h> #include <getopt.h> #include <iostream> @@ -87,9 +90,11 @@ using vespa::config::content::core::StorOpsloggerConfigBuilder; using vespa::config::content::core::StorPrioritymappingConfigBuilder; using vespa::config::content::LoadTypeConfigBuilder; using vespa::config::content::UpgradingConfigBuilder; +using vespa::config::content::core::StorDistributormanagerConfigBuilder; using vespa::config::content::core::StorServerConfigBuilder; using vespa::config::content::core::StorStatusConfigBuilder; using vespa::config::content::core::StorVisitorConfigBuilder; +using vespa::config::content::core::StorVisitordispatcherConfigBuilder; using metrics::MetricsmanagerConfigBuilder; using cloud::config::SlobroksConfigBuilder; using messagebus::MessagebusConfigBuilder; @@ -190,6 +195,23 @@ struct MyResourceWriteFilter : public IResourceWriteFilter State getAcceptState() const override { return IResourceWriteFilter::State(); } }; +class BucketSelector +{ + uint32_t _thread_id; + uint32_t _threads; + uint32_t _num_buckets; +public: + BucketSelector(uint32_t thread_id_in, uint32_t threads_in, uint32_t num_buckets_in) + : _thread_id(thread_id_in), + _threads(threads_in), + _num_buckets((num_buckets_in / _threads) * _threads) + { + } + uint64_t operator()(uint32_t i) { + return (static_cast<uint64_t>(i) * _threads + _thread_id) % _num_buckets; + } +}; + class BMRange { uint32_t _start; @@ -211,6 +233,7 @@ class BMParams { uint32_t _update_passes; uint32_t _remove_passes; uint32_t _rpc_network_threads; + bool _enable_distributor; bool _enable_service_layer; bool _use_storage_chain; uint32_t get_start(uint32_t thread_id) const { @@ -224,6 +247,7 @@ public: _update_passes(1), _remove_passes(2), _rpc_network_threads(1), + _enable_distributor(false), _enable_service_layer(false), _use_storage_chain(false) { @@ -237,6 +261,7 @@ public: uint32_t get_update_passes() const { return _update_passes; } uint32_t get_remove_passes() const { return _remove_passes; } uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } + bool get_enable_distributor() const { return _enable_distributor; } bool get_enable_service_layer() const { return _enable_service_layer; } bool get_use_storage_chain() const { return _use_storage_chain; } void set_documents(uint32_t documents_in) { _documents = documents_in; } @@ -245,6 +270,7 @@ public: void set_update_passes(uint32_t update_passes_in) { _update_passes = update_passes_in; } void set_remove_passes(uint32_t remove_passes_in) { _remove_passes = remove_passes_in; } void set_rpc_network_threads(uint32_t threads_in) { _rpc_network_threads = threads_in; } + void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; } void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } bool check() const; @@ -273,6 +299,10 @@ BMParams::check() const std::cerr << "Too few rpc network threads: " << _rpc_network_threads << std::endl; return false; } + if (_enable_distributor && !_enable_service_layer) { + std::cerr << "Service layer must be enabled if distributor layer is enabled" << std::endl; + return false; + } return true; } @@ -322,39 +352,31 @@ struct MyStorageConfig { vespalib::string config_id; DocumenttypesConfigBuilder documenttypes; - PersistenceConfigBuilder persistence; StorDistributionConfigBuilder stor_distribution; - StorFilestorConfigBuilder stor_filestor; StorBouncerConfigBuilder stor_bouncer; StorCommunicationmanagerConfigBuilder stor_communicationmanager; - StorBucketInitConfigBuilder stor_bucket_init; StorOpsloggerConfigBuilder stor_opslogger; StorPrioritymappingConfigBuilder stor_prioritymapping; UpgradingConfigBuilder upgrading; StorServerConfigBuilder stor_server; StorStatusConfigBuilder stor_status; - StorVisitorConfigBuilder stor_visitor; BucketspacesConfigBuilder bucketspaces; LoadTypeConfigBuilder load_type; MetricsmanagerConfigBuilder metricsmanager; SlobroksConfigBuilder slobroks; MessagebusConfigBuilder messagebus; - MyStorageConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int status_port, uint32_t rpc_network_threads) + MyStorageConfig(bool distributor, const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, int rpc_network_threads) : config_id(config_id_in), documenttypes(documenttypes_in), - persistence(), stor_distribution(), - stor_filestor(), stor_bouncer(), stor_communicationmanager(), - stor_bucket_init(), stor_opslogger(), stor_prioritymapping(), upgrading(), stor_server(), stor_status(), - stor_visitor(), bucketspaces(), load_type(), metricsmanager(), @@ -379,7 +401,12 @@ struct MyStorageConfig dc.redundancy = 1; dc.readyCopies = 1; } - stor_server.rootFolder = "storage"; + stor_server.isDistributor = distributor; + if (distributor) { + stor_server.rootFolder = "distributor"; + } else { + stor_server.rootFolder = "storage"; + } { SlobroksConfigBuilder::Slobrok slobrok; slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); @@ -387,6 +414,9 @@ struct MyStorageConfig } stor_communicationmanager.useDirectStorageapiRpc = true; stor_communicationmanager.rpc.numNetworkThreads = rpc_network_threads; + stor_communicationmanager.mbusport = mbus_port; + stor_communicationmanager.rpcport = rpc_port; + stor_status.httpport = status_port; } @@ -394,18 +424,14 @@ struct MyStorageConfig void add_builders(ConfigSet &set) { set.addBuilder(config_id, &documenttypes); - set.addBuilder(config_id, &persistence); set.addBuilder(config_id, &stor_distribution); - set.addBuilder(config_id, &stor_filestor); set.addBuilder(config_id, &stor_bouncer); set.addBuilder(config_id, &stor_communicationmanager); - set.addBuilder(config_id, &stor_bucket_init); set.addBuilder(config_id, &stor_opslogger); set.addBuilder(config_id, &stor_prioritymapping); set.addBuilder(config_id, &upgrading); set.addBuilder(config_id, &stor_server); set.addBuilder(config_id, &stor_status); - set.addBuilder(config_id, &stor_visitor); set.addBuilder(config_id, &bucketspaces); set.addBuilder(config_id, &load_type); set.addBuilder(config_id, &metricsmanager); @@ -416,6 +442,58 @@ struct MyStorageConfig MyStorageConfig::~MyStorageConfig() = default; +struct MyServiceLayerConfig : public MyStorageConfig +{ + PersistenceConfigBuilder persistence; + StorFilestorConfigBuilder stor_filestor; + StorBucketInitConfigBuilder stor_bucket_init; + StorVisitorConfigBuilder stor_visitor; + + MyServiceLayerConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, uint32_t rpc_network_threads) + : MyStorageConfig(false, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, rpc_network_threads), + persistence(), + stor_filestor(), + stor_bucket_init(), + stor_visitor() + { + } + + ~MyServiceLayerConfig(); + + void add_builders(ConfigSet &set) { + MyStorageConfig::add_builders(set); + set.addBuilder(config_id, &persistence); + set.addBuilder(config_id, &stor_filestor); + set.addBuilder(config_id, &stor_bucket_init); + set.addBuilder(config_id, &stor_visitor); + } +}; + +MyServiceLayerConfig::~MyServiceLayerConfig() = default; + +struct MyDistributorConfig : public MyStorageConfig +{ + StorDistributormanagerConfigBuilder stor_distributormanager; + StorVisitordispatcherConfigBuilder stor_visitordispatcher; + + MyDistributorConfig(const vespalib::string& config_id_in, const DocumenttypesConfig& documenttypes_in, int slobrok_port, int mbus_port, int rpc_port, int status_port, int rpc_network_threads) + : MyStorageConfig(true, config_id_in, documenttypes_in, slobrok_port, mbus_port, rpc_port, status_port, rpc_network_threads), + stor_distributormanager(), + stor_visitordispatcher() + { + } + + ~MyDistributorConfig(); + + void add_builders(ConfigSet &set) { + MyStorageConfig::add_builders(set); + set.addBuilder(config_id, &stor_distributormanager); + set.addBuilder(config_id, &stor_visitordispatcher); + } +}; + +MyDistributorConfig::~MyDistributorConfig() = default; + struct MyRpcClientConfig { vespalib::string config_id; SlobroksConfigBuilder slobroks; @@ -452,8 +530,13 @@ struct PersistenceProviderFixture { DummyFileHeaderContext _file_header_context; int _tls_listen_port; int _slobrok_port; - int _status_port; int _rpc_client_port; + int _service_layer_mbus_port; + int _service_layer_rpc_port; + int _service_layer_status_port; + int _distributor_mbus_port; + int _distributor_rpc_port; + int _distributor_status_port; TransLogServer _tls; vespalib::string _tls_spec; matching::QueryLimiter _query_limiter; @@ -468,26 +551,33 @@ struct PersistenceProviderFixture { MyResourceWriteFilter _write_filter; std::shared_ptr<PersistenceEngine> _persistence_engine; uint32_t _bucket_bits; - MyStorageConfig _service_layer_config; + MyServiceLayerConfig _service_layer_config; + MyDistributorConfig _distributor_config; MyRpcClientConfig _rpc_client_config; ConfigSet _config_set; std::shared_ptr<IConfigContext> _config_context; std::unique_ptr<IBmFeedHandler> _feed_handler; std::unique_ptr<mbus::Slobrok> _slobrok; + std::shared_ptr<StorageApiChainBmFeedHandler::Context> _service_layer_chain_context; std::unique_ptr<MyServiceLayerProcess> _service_layer; std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; + std::unique_ptr<storage::DistributorProcess> _distributor; PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); void create_document_db(); uint32_t num_buckets() const { return (1u << _bucket_bits); } - BucketId make_bucket_id(uint32_t i) const { return BucketId(_bucket_bits, i & (num_buckets() - 1)); } - document::Bucket make_bucket(uint32_t i) const { return document::Bucket(_bucket_space, BucketId(_bucket_bits, i & (num_buckets() - 1))); } - DocumentId make_document_id(uint32_t i) const; - std::unique_ptr<Document> make_document(uint32_t i) const; - std::unique_ptr<DocumentUpdate> make_document_update(uint32_t i) const; + BucketId make_bucket_id(uint32_t n) const { return BucketId(_bucket_bits, n & (num_buckets() - 1)); } + document::Bucket make_bucket(uint32_t n) const { return document::Bucket(_bucket_space, make_bucket_id(n)); } + DocumentId make_document_id(uint32_t n, uint32_t i) const; + std::unique_ptr<Document> make_document(uint32_t n, uint32_t i) const; + std::unique_ptr<DocumentUpdate> make_document_update(uint32_t n, uint32_t i) const; void create_buckets(); - void start_service_layer(bool use_storage_chain); + void start_service_layer(const BMParams& params); + void start_distributor(const BMParams& params); + void create_feed_handler(const BMParams& params); + void shutdown_feed_handler(); + void shutdown_distributor(); void shutdown_service_layer(); }; @@ -502,8 +592,13 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _file_header_context(), _tls_listen_port(9017), _slobrok_port(9018), - _status_port(9019), - _rpc_client_port(9020), + _rpc_client_port(9019), + _service_layer_mbus_port(9020), + _service_layer_rpc_port(9021), + _service_layer_status_port(9022), + _distributor_mbus_port(9023), + _distributor_rpc_port(9024), + _distributor_status_port(9025), _tls("tls", _tls_listen_port, _base_dir, _file_header_context), _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), _query_limiter(), @@ -518,20 +613,24 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _write_filter(), _persistence_engine(), _bucket_bits(16), - _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _status_port, params.get_rpc_network_threads()), + _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params.get_rpc_network_threads()), + _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params.get_rpc_network_threads()), _rpc_client_config("bm-rpc-client", _slobrok_port), _config_set(), _config_context(std::make_shared<ConfigContext>(_config_set)), _feed_handler(), _slobrok(), + _service_layer_chain_context(), _service_layer(), - _rpc_client_shared_rpc_resources() + _rpc_client_shared_rpc_resources(), + _distributor() { create_document_db(); _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false); auto proxy = std::make_shared<PersistenceHandlerProxy>(_document_db); _persistence_engine->putHandler(_persistence_engine->getWLock(), _bucket_space, _doc_type_name, proxy); _service_layer_config.add_builders(_config_set); + _distributor_config.add_builders(_config_set); _rpc_client_config.add_builders(_config_set); _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine); } @@ -591,16 +690,16 @@ PersistenceProviderFixture::create_document_db() } DocumentId -PersistenceProviderFixture::make_document_id(uint32_t i) const +PersistenceProviderFixture::make_document_id(uint32_t n, uint32_t i) const { - DocumentId id(vespalib::make_string("id::test:n=%u:%u", i & (num_buckets() - 1), i)); + DocumentId id(vespalib::make_string("id::test:n=%u:%u", n & (num_buckets() - 1), i)); return id; } std::unique_ptr<Document> -PersistenceProviderFixture::make_document(uint32_t i) const +PersistenceProviderFixture::make_document(uint32_t n, uint32_t i) const { - auto id = make_document_id(i); + auto id = make_document_id(n, i); auto document = std::make_unique<Document>(*_document_type, id); document->setRepo(*_repo); document->setFieldValue(_field, std::make_unique<IntFieldValue>(i)); @@ -608,9 +707,9 @@ PersistenceProviderFixture::make_document(uint32_t i) const } std::unique_ptr<DocumentUpdate> -PersistenceProviderFixture::make_document_update(uint32_t i) const +PersistenceProviderFixture::make_document_update(uint32_t n, uint32_t i) const { - auto id = make_document_id(i); + auto id = make_document_id(n, i); auto document_update = std::make_unique<DocumentUpdate>(*_repo, *_document_type, id); document_update->addUpdate(FieldUpdate(_field).addUpdate(AssignValueUpdate(IntFieldValue(15)))); return document_update; @@ -626,17 +725,16 @@ PersistenceProviderFixture::create_buckets() } void -PersistenceProviderFixture::start_service_layer(bool use_storage_chain) +PersistenceProviderFixture::start_service_layer(const BMParams& params) { LOG(info, "start slobrok"); _slobrok = std::make_unique<mbus::Slobrok>(_slobrok_port); LOG(info, "start service layer"); config::ConfigUri config_uri("bm-servicelayer", _config_context); std::unique_ptr<storage::IStorageChainBuilder> chain_builder; - std::shared_ptr<StorageApiChainBmFeedHandler::Context> context; - if (use_storage_chain) { - context = StorageApiChainBmFeedHandler::get_context(); - chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(context); + if (params.get_use_storage_chain()) { + _service_layer_chain_context = StorageApiChainBmFeedHandler::get_context(); + chain_builder = StorageApiChainBmFeedHandler::get_storage_chain_builder(_service_layer_chain_context); } _service_layer = std::make_unique<MyServiceLayerProcess>(config_uri, *_persistence_engine, @@ -648,17 +746,50 @@ PersistenceProviderFixture::start_service_layer(bool use_storage_chain) config::ConfigUri client_config_uri("bm-rpc-client", _config_context); _rpc_client_shared_rpc_resources = std::make_unique<SharedRpcResources>(client_config_uri, _rpc_client_port, 100); _rpc_client_shared_rpc_resources->start_server_and_register_slobrok("bm-rpc-client"); - if (use_storage_chain) { - _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(std::move(context)); - } else { - _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); +} + +void +PersistenceProviderFixture::start_distributor(const BMParams& params) +{ + if (params.get_enable_distributor()) { + config::ConfigUri config_uri("bm-distributor", _config_context); + _distributor = std::make_unique<storage::DistributorProcess>(config_uri); + _distributor->setupConfig(100ms); + _distributor->createNode(); } } void -PersistenceProviderFixture::shutdown_service_layer() +PersistenceProviderFixture::create_feed_handler(const BMParams& params) +{ + if (params.get_enable_service_layer()) { + if (params.get_use_storage_chain()) { + _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context); + } else { + _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo); + } + } +} + +void +PersistenceProviderFixture::shutdown_feed_handler() { _feed_handler.reset(); +} + +void +PersistenceProviderFixture::shutdown_distributor() +{ + if (_distributor) { + LOG(info, "stop distributor"); + _distributor->getNode().requestShutdown("controlled shutdown"); + _distributor->shutdown(); + } +} + +void +PersistenceProviderFixture::shutdown_service_layer() +{ if (_rpc_client_shared_rpc_resources) { LOG(info, "stop rpc client shared resources"); _rpc_client_shared_rpc_resources->shutdown(); @@ -676,20 +807,21 @@ PersistenceProviderFixture::shutdown_service_layer() } vespalib::nbostream -make_put_feed(PersistenceProviderFixture &f, BMRange range) +make_put_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) { vespalib::nbostream serialized_feed; LOG(debug, "make_put_feed([%u..%u))", range.get_start(), range.get_end()); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - serialized_feed << f.make_bucket_id(i); - auto document = f.make_document(i); + auto n = bucket_selector(i); + serialized_feed << f.make_bucket_id(n); + auto document = f.make_document(n, i); document->serialize(serialized_feed); } return serialized_feed; } std::vector<vespalib::nbostream> -make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange)> func, const vespalib::string &label) +make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, std::function<vespalib::nbostream(BMRange,BucketSelector)> func, uint32_t num_buckets, const vespalib::string &label) { LOG(info, "make_feed %s %u small documents", label.c_str(), bm_params.get_documents()); std::vector<vespalib::nbostream> serialized_feed_v; @@ -697,8 +829,9 @@ make_feed(vespalib::ThreadStackExecutor &executor, const BMParams &bm_params, st serialized_feed_v.resize(bm_params.get_threads()); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); - executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func]() - { serialized_feed_v[i] = func(range); })); + BucketSelector bucket_selector(i, bm_params.get_threads(), num_buckets); + executor.execute(makeLambdaTask([&serialized_feed_v, i, range, &func, bucket_selector]() + { serialized_feed_v[i] = func(range, bucket_selector); })); } executor.sync(); auto end_time = std::chrono::steady_clock::now(); @@ -730,6 +863,7 @@ void run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams& bm_params) { LOG(info, "putAsync %u small documents, pass=%u", bm_params.get_documents(), pass); + uint32_t old_errors = f._feed_handler->get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); @@ -739,18 +873,20 @@ run_put_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - LOG(info, "%8.2f puts/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass); + uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + LOG(info, "%8.2f puts/s, %u errors for pass=%u", bm_params.get_documents() / elapsed.count(), new_errors, pass); time_bias += bm_params.get_documents(); } vespalib::nbostream -make_update_feed(PersistenceProviderFixture &f, BMRange range) +make_update_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) { vespalib::nbostream serialized_feed; LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - serialized_feed << f.make_bucket_id(i); - auto document_update = f.make_document_update(i); + auto n = bucket_selector(i); + serialized_feed << f.make_bucket_id(n); + auto document_update = f.make_document_update(n, i); document_update->serializeHEAD(serialized_feed); } return serialized_feed; @@ -779,6 +915,7 @@ void run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams& bm_params) { LOG(info, "updateAsync %u small documents, pass=%u", bm_params.get_documents(), pass); + uint32_t old_errors = f._feed_handler->get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); @@ -788,18 +925,20 @@ run_update_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - LOG(info, "%8.2f updates/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass); + uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + LOG(info, "%8.2f updates/s, %u errors for pass=%u", bm_params.get_documents() / elapsed.count(), new_errors, pass); time_bias += bm_params.get_documents(); } vespalib::nbostream -make_remove_feed(PersistenceProviderFixture &f, BMRange range) +make_remove_feed(PersistenceProviderFixture &f, BMRange range, BucketSelector bucket_selector) { vespalib::nbostream serialized_feed; LOG(debug, "make_update_feed([%u..%u))", range.get_start(), range.get_end()); for (unsigned int i = range.get_start(); i < range.get_end(); ++i) { - serialized_feed << f.make_bucket_id(i); - auto document_id = f.make_document_id(i); + auto n = bucket_selector(i); + serialized_feed << f.make_bucket_id(n); + auto document_id = f.make_document_id(n, i); vespalib::string raw_id = document_id.toString(); serialized_feed.write(raw_id.c_str(), raw_id.size() + 1); } @@ -828,6 +967,7 @@ void run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecutor &executor, int pass, int64_t& time_bias, const std::vector<vespalib::nbostream> &serialized_feed_v, const BMParams &bm_params) { LOG(info, "removeAsync %u small documents, pass=%u", bm_params.get_documents(), pass); + uint32_t old_errors = f._feed_handler->get_error_count(); auto start_time = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < bm_params.get_threads(); ++i) { auto range = bm_params.get_range(i); @@ -837,7 +977,8 @@ run_remove_async_tasks(PersistenceProviderFixture &f, vespalib::ThreadStackExecu executor.sync(); auto end_time = std::chrono::steady_clock::now(); std::chrono::duration<double> elapsed = end_time - start_time; - LOG(info, "%8.2f removes/s for pass=%u", bm_params.get_documents() / elapsed.count(), pass); + uint32_t new_errors = f._feed_handler->get_error_count() - old_errors; + LOG(info, "%8.2f removes/s, %u errors for pass=%u", bm_params.get_documents() / elapsed.count(), new_errors, pass); time_bias += bm_params.get_documents(); } @@ -851,12 +992,16 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "create %u buckets", f.num_buckets()); f.create_buckets(); if (bm_params.get_enable_service_layer()) { - f.start_service_layer(bm_params.get_use_storage_chain()); + f.start_service_layer(bm_params); } + if (bm_params.get_enable_distributor()) { + f.start_distributor(bm_params); + } + f.create_feed_handler(bm_params); vespalib::ThreadStackExecutor executor(bm_params.get_threads(), 128 * 1024); - auto put_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_put_feed(f, range); }, "put"); - auto update_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_update_feed(f, range); }, "update"); - auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range) { return make_remove_feed(f, range); }, "remove"); + auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put"); + auto update_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_update_feed(f, range, bucket_selector); }, f.num_buckets(), "update"); + auto remove_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_remove_feed(f, range, bucket_selector); }, f.num_buckets(), "remove"); int64_t time_bias = 1; for (uint32_t pass = 0; pass < bm_params.get_put_passes(); ++pass) { run_put_async_tasks(f, executor, pass, time_bias, put_feed, bm_params); @@ -867,6 +1012,8 @@ void benchmark_async_spi(const BMParams &bm_params) for (uint32_t pass = 0; pass < bm_params.get_remove_passes(); ++pass) { run_remove_async_tasks(f, executor, pass, time_bias, remove_feed, bm_params); } + f.shutdown_feed_handler(); + f.shutdown_distributor(); f.shutdown_service_layer(); } @@ -903,6 +1050,7 @@ App::usage() "[--update-passes update-passes]\n" "[--remove-passes remove-passes]\n" "[--rpc-network-threads threads]\n" + "[--enable-distributor]\n" "[--enable-service-layer]\n" "[--use-storage-chain]" << std::endl; } @@ -920,6 +1068,7 @@ App::get_options() { "update-passes", 1, nullptr, 0 }, { "remove-passes", 1, nullptr, 0 }, { "rpc-network-threads", 1, nullptr, 0 }, + { "enable-distributor", 0, nullptr, 0 }, { "enable-service-layer", 0, nullptr, 0 }, { "use-storage-chain", 0, nullptr, 0 } }; @@ -930,6 +1079,7 @@ App::get_options() LONGOPT_UPDATE_PASSES, LONGOPT_REMOVE_PASSES, LONGOPT_RPC_NETWORK_THREADS, + LONGOPT_ENABLE_DISTRIBUTOR, LONGOPT_ENABLE_SERVICE_LAYER, LONGOPT_USE_STORAGE_CHAIN }; @@ -957,6 +1107,9 @@ App::get_options() case LONGOPT_RPC_NETWORK_THREADS: _bm_params.set_rpc_network_threads(atoi(opt_argument)); break; + case LONGOPT_ENABLE_DISTRIBUTOR: + _bm_params.set_enable_distributor(true); + break; case LONGOPT_ENABLE_SERVICE_LAYER: _bm_params.set_enable_service_layer(true); break; diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp index 9db1fee58e1..1f979d1566c 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/lid_reuse_delayer_config.cpp @@ -19,7 +19,7 @@ LidReuseDelayerConfig::LidReuseDelayerConfig() LidReuseDelayerConfig::LidReuseDelayerConfig(vespalib::duration visibilityDelay, bool hasIndexedOrAttributeFields_in) : _visibilityDelay(visibilityDelay), - _allowEarlyAck(visibilityDelay > vespalib::duration::zero()), + _allowEarlyAck(visibilityDelay > 1ms), _hasIndexedOrAttributeFields(hasIndexedOrAttributeFields_in) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index 066e135741e..ea5d46f02ad 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -218,6 +218,16 @@ CombiningFeedView::heartBeat(search::SerialNum serialNum) } } +bool +CombiningFeedView::allowEarlyAck() const { + for (const auto &view : _views) { + if ( ! view->allowEarlyAck() ) { + return false; + } + } + return true; +} + void CombiningFeedView::sync() { diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index d1da0408318..3a37fdc37cb 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -83,6 +83,7 @@ public: // Called by document db executor void setCalculator(const IBucketStateCalculator::SP &newCalc); + bool allowEarlyAck() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h index 388a1101dcf..5ed0ad7492c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h @@ -135,7 +135,7 @@ public: } vespalib::duration getVisibilityDelay() const { return _visibilityDelay; } bool hasVisibilityDelay() const { return _visibilityDelay > vespalib::duration::zero(); } - bool allowEarlyAck() const { return hasVisibilityDelay(); } + bool allowEarlyAck() const { return _visibilityDelay > 1ms; } const DocumentDBLidSpaceCompactionConfig &getLidSpaceCompactionConfig() const { return _lidSpaceCompaction; } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 282ec3c9ace..8b82478c1a4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -93,18 +93,18 @@ TlsMgrWriter::sync(SerialNum syncTo) { for (int retryCount = 0; retryCount < 10; ++retryCount) { SerialNum syncedTo(0); - LOG(spam, "Trying tls sync(%" PRIu64 ")", syncTo); + LOG(debug, "Trying tls sync(%" PRIu64 ")", syncTo); bool res = _tls_mgr.getSession()->sync(syncTo, syncedTo); if (!res) { - LOG(spam, "Tls sync failed, retrying"); + LOG(debug, "Tls sync failed, retrying"); sleep(1); continue; } if (syncedTo >= syncTo) { - LOG(spam, "Tls sync complete, reached %" PRIu64", returning", syncedTo); + LOG(debug, "Tls sync complete, reached %" PRIu64", returning", syncedTo); return syncedTo; } - LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); + LOG(debug, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); } @@ -402,6 +402,9 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _tlsReplayProgress(), _serialNum(0), _prunedSerialNum(0), + _numOperationsPendingCommit(0), + _numOperationsCompleted(0), + _numCommitsCompleted(0), _delayedPrune(false), _feedLock(), _feedState(make_shared<InitState>(getDocTypeName())), @@ -495,11 +498,46 @@ FeedHandler::getTransactionLogReplayDone() const { } void +FeedHandler::onCommitDone(size_t numPendingAtStart) { + assert(numPendingAtStart <= _numOperationsPendingCommit); + _numOperationsPendingCommit -= numPendingAtStart; + _numOperationsCompleted += numPendingAtStart; + _numCommitsCompleted++; + if (_numOperationsPendingCommit > 0) { + enqueCommitTask(); + } + LOG(spam, "%zu: onCommitDone(%zu) total=%zu left=%zu", + _numCommitsCompleted, numPendingAtStart, _numOperationsCompleted, _numOperationsPendingCommit); +} + +void FeedHandler::enqueCommitTask() { + _writeService.master().execute(makeLambdaTask([this]() { initiateCommit(); })); +} + +void +FeedHandler::initiateCommit() { + auto onCommitDoneContext = std::make_shared<OnCommitDone>( + _writeService.master(), + makeLambdaTask([this, numPendingAtStart=_numOperationsPendingCommit]() { + onCommitDone(numPendingAtStart); + })); + auto commitResult = _tlsWriter->startCommit(onCommitDoneContext); + if (_activeFeedView && ! _activeFeedView->allowEarlyAck()) { + using KeepAlivePair = KeepAlive<std::pair<CommitResult, DoneCallback>>; + auto pair = std::make_pair(std::move(commitResult), std::move(onCommitDoneContext)); + _activeFeedView->forceCommit(_serialNum, std::make_shared<KeepAlivePair>(std::move(pair))); + } +} + +void FeedHandler::appendOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } _tlsWriter->appendOperation(op, std::move(onDone)); + if (++_numOperationsPendingCommit == 1) { + enqueCommitTask(); + } } FeedHandler::CommitResult diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 4807c596130..c295b26a759 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -76,6 +76,9 @@ private: // the serial num of the last message in the transaction log SerialNum _serialNum; SerialNum _prunedSerialNum; + size_t _numOperationsPendingCommit; + size_t _numOperationsCompleted; + size_t _numCommitsCompleted; bool _delayedPrune; mutable std::shared_mutex _feedLock; FeedStateSP _feedState; @@ -125,6 +128,9 @@ private: FeedStateSP getFeedState() const; void changeFeedState(FeedStateSP newState); void doChangeFeedState(FeedStateSP newState); + void onCommitDone(size_t numPendingAtStart); + void initiateCommit(); + void enqueCommitTask(); public: FeedHandler(const FeedHandler &) = delete; FeedHandler & operator = (const FeedHandler &) = delete; diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index 4b028a289a9..8dd7ae8474e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -65,6 +65,7 @@ public: virtual void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation & pruneOp) = 0; virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) = 0; virtual ILidCommitState & getUncommittedLidsTracker() = 0; + virtual bool allowEarlyAck() const = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index bda94c22adc..90875aa8591 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -275,7 +275,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp void StoreOnlyFeedView::considerEarlyAck(FeedToken & token) { - if ( _lidReuseDelayer.allowEarlyAck() && token) { + if (allowEarlyAck() && token) { token.reset(); } } @@ -349,6 +349,11 @@ StoreOnlyFeedView::needImmediateCommit() const { return _lidReuseDelayer.needImmediateCommit(); } +bool +StoreOnlyFeedView::allowEarlyAck() const { + return _lidReuseDelayer.allowEarlyAck(); +} + void StoreOnlyFeedView::heartBeatIndexedFields(SerialNum ) {} diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 4569e01f9fd..20942423995 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -264,6 +264,7 @@ public: void handlePruneRemovedDocuments(const PruneRemovedDocumentsOperation &pruneOp) override; void handleCompactLidSpace(const CompactLidSpaceOperation &op) override; ILidCommitState & getUncommittedLidsTracker() override; + bool allowEarlyAck() const final override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index adfc911c8df..b96fd77409c 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -34,6 +34,7 @@ struct DummyFeedView : public IFeedView void handleCompactLidSpace(const CompactLidSpaceOperation &) override {} void forceCommit(search::SerialNum, DoneCallback) override { } ILidCommitState & getUncommittedLidsTracker() override; + bool allowEarlyAck() const override { return false; } }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 9e0f1a8a1aa..126a7afed4d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -113,7 +113,12 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } } -Domain::~Domain() { } +Domain::~Domain() { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + commitChunk(grabCurrentChunk(guard), guard); + _singleCommitter->shutdown().sync(); +} DomainInfo Domain::getDomainInfo() const @@ -204,6 +209,10 @@ Domain::getSynced() const void Domain::triggerSyncNow() { + { + vespalib::MonitorGuard guard(_currentChunkMonitor); + commitAndTransferResponses(guard); + } MonitorGuard guard(_syncMonitor); if (!_pendingSync) { _pendingSync = true; @@ -318,22 +327,78 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } +void +Domain::append(const Packet & packet, Writer::DoneCallback onDone) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + if (_lastSerial >= packet.range().from()) { + throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } + _currentChunk->add(packet, std::move(onDone)); + commitIfFull(guard); +} + Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - (void) onDone; + vespalib::MonitorGuard guard(_currentChunkMonitor); + if ( !_currentChunk->empty() ) { + auto completed = grabCurrentChunk(guard); + completed->setCommitDoneCallback(std::move(onDone)); + CommitResult result(completed->createCommitResult()); + commitChunk(std::move(completed), guard); + return result; + } return CommitResult(); } void -Domain::append(const Packet & packet, Writer::DoneCallback onDone) -{ - (void) onDone; +Domain::commitIfFull(const vespalib::MonitorGuard &guard) { + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { + commitAndTransferResponses(guard); + } +} + +void +Domain::commitAndTransferResponses(const vespalib::MonitorGuard &guard) { + auto completed = std::move(_currentChunk); + _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks()); + commitChunk(std::move(completed), guard); +} + +std::unique_ptr<CommitChunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = createCommitChunk(_config); + return chunk; +} + +void +Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { + doCommit(std::move(chunk)); + })); +} + +void +Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { + const Packet & packet = chunk->getPacket(); + if (packet.empty()) return; + vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); DomainPart::SP dp = optionallyRotateFile(entry.serial()); dp->commit(entry.serial(), packet); + if (_config.getFSyncOnCommit()) { + dp->sync(); + } cleanSessions(); + LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", + chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 7e77e6ef0ef..e41ad930840 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -56,6 +56,12 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: + void commitIfFull(const vespalib::MonitorGuard & guard); + void commitAndTransferResponses(const vespalib::MonitorGuard & guard); + + std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + void doCommit(std::unique_ptr<CommitChunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; diff --git a/simplemetrics/src/main/java/com/yahoo/metrics/simple/Measurement.java b/simplemetrics/src/main/java/com/yahoo/metrics/simple/Measurement.java index cc7a4b0f717..4098ac1bdea 100644 --- a/simplemetrics/src/main/java/com/yahoo/metrics/simple/Measurement.java +++ b/simplemetrics/src/main/java/com/yahoo/metrics/simple/Measurement.java @@ -2,12 +2,12 @@ package com.yahoo.metrics.simple; /** - * Wrapper class for the actually measured value. Candidate for removal, but I - * wanted a type instead of some opaque instance of Number. + * Wrapper class for the actually measured value. * * @author Steinar Knutsen */ public class Measurement { + private final Number magnitude; public Measurement(Number magnitude) { diff --git a/simplemetrics/src/main/java/com/yahoo/metrics/simple/Sample.java b/simplemetrics/src/main/java/com/yahoo/metrics/simple/Sample.java index 837e93de09a..0d2144deeb4 100644 --- a/simplemetrics/src/main/java/com/yahoo/metrics/simple/Sample.java +++ b/simplemetrics/src/main/java/com/yahoo/metrics/simple/Sample.java @@ -42,8 +42,7 @@ public class Sample { * Get histogram definition for an arbitrary metric. Caveat emptor: This * involves reading a volatile. * - * @param metricName - * name of the metric to get histogram definition for + * @param metricName name of the metric to get histogram definition for * @return how to define a new histogram or null */ MetricSettings getHistogramDefinition(String metricName) { diff --git a/simplemetrics/src/main/java/com/yahoo/metrics/simple/jdisc/JdiscMetricsFactory.java b/simplemetrics/src/main/java/com/yahoo/metrics/simple/jdisc/JdiscMetricsFactory.java index ae84ee95ecc..30102c43919 100644 --- a/simplemetrics/src/main/java/com/yahoo/metrics/simple/jdisc/JdiscMetricsFactory.java +++ b/simplemetrics/src/main/java/com/yahoo/metrics/simple/jdisc/JdiscMetricsFactory.java @@ -14,7 +14,7 @@ import com.yahoo.metrics.simple.MetricReceiver; /** * A factory for all the JDisc API classes. * - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + * @author Steinar Knutsen */ public class JdiscMetricsFactory implements MetricConsumerFactory, SnapshotProvider { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index b931a7f2617..aa914f852c3 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -572,27 +572,35 @@ public class IOThread implements Runnable, AutoCloseable { public void checkOldConnections() { List<GatewayConnection> toRemove = null; - for (GatewayConnection connection : connections) { - if (closingTime(connection).isBefore(clock.instant())) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - connection.close(); - if (toRemove == null) - toRemove = new ArrayList<>(1); - toRemove.add(connection); - } catch (Exception e) { - // Old connection; best effort - } - } else if (timeToPoll(connection)) { - try { - IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); - } catch (Exception e) { - // Old connection; best effort + try { + for (GatewayConnection connection : connections) { + if (closingTime(connection).isBefore(clock.instant())) { + try { + try { + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + } finally { + connection.close(); + } + } catch (Exception e) { + // Old connection; best effort + } finally { + if (toRemove == null) + toRemove = new ArrayList<>(1); + toRemove.add(connection); + } + } else if (timeToPoll(connection)) { + try { + IOThread.processResponse(connection.poll(), endpoint, clusterId, statusReceivedCounter, resultQueue); + } catch (Exception e) { + // Old connection; best effort + } } } + } finally { + if (toRemove != null) + connections.removeAll(toRemove); + } - if (toRemove != null) - connections.removeAll(toRemove); } private boolean timeToPoll(GatewayConnection connection) { diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json index f4ad1ab4372..e026559b283 100644 --- a/zkfacade/abi-spec.json +++ b/zkfacade/abi-spec.json @@ -105,6 +105,7 @@ ], "methods": [ "public void <init>(java.lang.String, com.yahoo.vespa.curator.Curator)", + "public void <init>(java.lang.String, org.apache.curator.framework.recipes.locks.InterProcessLock)", "public void acquire(java.time.Duration)", "public void close()" ], diff --git a/zkfacade/pom.xml b/zkfacade/pom.xml index de542b89b67..7f335467751 100644 --- a/zkfacade/pom.xml +++ b/zkfacade/pom.xml @@ -76,6 +76,11 @@ <artifactId>zookeeper</artifactId> <version>${zookeeper.client.version}</version> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java index d97d8f5ed71..920bba22804 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Lock.java @@ -4,11 +4,11 @@ package com.yahoo.vespa.curator; import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.path.Path; import com.yahoo.transaction.Mutex; +import com.yahoo.vespa.curator.stats.ThreadLockStats; import org.apache.curator.framework.recipes.locks.InterProcessLock; import java.time.Duration; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; /** * A cluster-wide re-entrant mutex which is released on (the last symmetric) close. @@ -20,54 +20,51 @@ import java.util.concurrent.locks.ReentrantLock; */ public class Lock implements Mutex { - private final ReentrantLock lock; private final InterProcessLock mutex; private final String lockPath; public Lock(String lockPath, Curator curator) { + this(lockPath, curator.createMutex(lockPath)); + } + + /** Public for testing only */ + public Lock(String lockPath, InterProcessLock mutex) { this.lockPath = lockPath; - this.lock = new ReentrantLock(true); - mutex = curator.createMutex(lockPath); + this.mutex = mutex; } /** Take the lock with the given timeout. This may be called multiple times from the same thread - each matched by a close */ public void acquire(Duration timeout) throws UncheckedTimeoutException { + ThreadLockStats threadLockStats = ThreadLockStats.getCurrentThreadLockStats(); + threadLockStats.invokingAcquire(lockPath, timeout); + + final boolean acquired; try { - if ( ! mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS)) - throw new UncheckedTimeoutException("Timed out after waiting " + timeout + - " to acquire lock '" + lockPath + "'"); - if ( ! lock.tryLock()) { // Should be available to only this thread, while holding the above mutex. - release(); - throw new IllegalStateException("InterProcessMutex acquired, but guarded lock held by someone else, for lock '" + lockPath + "'"); - } - } - catch (UncheckedTimeoutException | IllegalStateException e) { - throw e; - } - catch (Exception e) { + acquired = mutex.acquire(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + threadLockStats.acquireFailed(lockPath); throw new RuntimeException("Exception acquiring lock '" + lockPath + "'", e); } - } - @Override - public void close() { - try { - lock.unlock(); - } - finally { - release(); + if (!acquired) { + threadLockStats.acquireTimedOut(lockPath); + throw new UncheckedTimeoutException("Timed out after waiting " + timeout + + " to acquire lock '" + lockPath + "'"); } + threadLockStats.lockAcquired(lockPath); } - private void release() { + @Override + public void close() { try { mutex.release(); + ThreadLockStats.getCurrentThreadLockStats().lockReleased(lockPath); } catch (Exception e) { + ThreadLockStats.getCurrentThreadLockStats().lockReleaseFailed(lockPath); throw new RuntimeException("Exception releasing lock '" + lockPath + "'"); } } - } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java new file mode 100644 index 00000000000..3b06377ccf7 --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttempt.java @@ -0,0 +1,101 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.stats; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +/** + * Information about a lock. + * + * <p>Should be mutated by a single thread, except {@link #fillStackTrace()} which can be + * invoked by any threads. Other threads may see an inconsistent state of this instance.</p> + * + * @author hakon + */ +public class LockAttempt { + + private final ThreadLockStats threadLockStats; + private final String lockPath; + private final Instant callAcquireInstant; + private final Duration timeout; + + private volatile Optional<Instant> lockAcquiredInstant = Optional.empty(); + private volatile Optional<Instant> terminalStateInstant = Optional.empty(); + private volatile Optional<String> stackTrace = Optional.empty(); + + public static LockAttempt invokingAcquire(ThreadLockStats threadLockStats, String lockPath, Duration timeout) { + return new LockAttempt(threadLockStats, lockPath, timeout, Instant.now()); + } + + public enum LockState { + ACQUIRING(false), ACQUIRE_FAILED(true), TIMED_OUT(true), ACQUIRED(false), RELEASED(true), + RELEASED_WITH_ERROR(true); + + private final boolean terminal; + + LockState(boolean terminal) { this.terminal = terminal; } + + public boolean isTerminal() { return terminal; } + } + + private volatile LockState lockState = LockState.ACQUIRING; + + private LockAttempt(ThreadLockStats threadLockStats, String lockPath, Duration timeout, Instant callAcquireInstant) { + this.threadLockStats = threadLockStats; + this.lockPath = lockPath; + this.callAcquireInstant = callAcquireInstant; + this.timeout = timeout; + } + + public String getThreadName() { return threadLockStats.getThreadName(); } + public String getLockPath() { return lockPath; } + public Instant getTimeAcquiredWasInvoked() { return callAcquireInstant; } + public Duration getAcquireTimeout() { return timeout; } + public LockState getLockState() { return lockState; } + public Optional<Instant> getTimeLockWasAcquired() { return lockAcquiredInstant; } + public Optional<Instant> getTimeTerminalStateWasReached() { return terminalStateInstant; } + public Optional<String> getStackTrace() { return stackTrace; } + + public Duration getDurationOfAcquire() { + return Duration.between(callAcquireInstant, lockAcquiredInstant.orElseGet(Instant::now)); + } + + public Duration getDurationWithLock() { + return lockAcquiredInstant + .map(start -> Duration.between(start, terminalStateInstant.orElseGet(Instant::now))) + .orElse(Duration.ZERO); + } + + public Duration getDuration() { return Duration.between(callAcquireInstant, terminalStateInstant.orElseGet(Instant::now)); } + + /** Get time from just before trying to acquire lock to the time the terminal state was reached, or ZERO. */ + public Duration getStableTotalDuration() { + return terminalStateInstant.map(instant -> Duration.between(callAcquireInstant, instant)).orElse(Duration.ZERO); + } + + /** Fill in the stack trace starting at the caller's stack frame. */ + public void fillStackTrace() { + // This method is public. If invoked concurrently, the this.stackTrace may be updated twice, + // which is fine. + + this.stackTrace = Optional.of(threadLockStats.getStackTrace()); + } + + void acquireFailed() { setTerminalState(LockState.ACQUIRE_FAILED); } + void timedOut() { setTerminalState(LockState.TIMED_OUT); } + void released() { setTerminalState(LockState.RELEASED); } + void releasedWithError() { setTerminalState(LockState.RELEASED_WITH_ERROR); } + + void lockAcquired() { + lockState = LockState.ACQUIRED; + lockAcquiredInstant = Optional.of(Instant.now()); + } + + void setTerminalState(LockState terminalState) { setTerminalState(terminalState, Instant.now()); } + + void setTerminalState(LockState terminalState, Instant instant) { + lockState = terminalState; + terminalStateInstant = Optional.of(instant); + } +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttemptSamples.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttemptSamples.java new file mode 100644 index 00000000000..54cb82ebc1e --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockAttemptSamples.java @@ -0,0 +1,106 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.stats; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * Collection containing "interesting" {@code LockAttempt}s. + * + * @author hakon + */ +// @ThreadSafe +public class LockAttemptSamples { + private final int maxSamples; + + /** Ensure atomic operations on this collection. */ + private final Object monitor = new Object(); + + /** Keep at most one sample for each lock path. */ + private final Map<String, LockAttempt> byLockPath; + + /** + * Priority queue containing all samples. The head of this queue (peek()/poll()) + * returns the LockAttempt with the smallest duration. + */ + private final PriorityQueue<LockAttempt> priorityQueue = + new PriorityQueue<>(Comparator.comparing(LockAttempt::getStableTotalDuration)); + + LockAttemptSamples() { this(10); } + + LockAttemptSamples(int maxSamples) { + this.maxSamples = maxSamples; + this.byLockPath = new HashMap<>(maxSamples); + } + + int size() { return byLockPath.size(); } + + boolean maybeSample(LockAttempt lockAttempt) { + final boolean added; + synchronized (monitor) { + if (shouldAdd(lockAttempt)) { + byLockPath.put(lockAttempt.getLockPath(), lockAttempt); + priorityQueue.add(lockAttempt); + added = true; + } else { + added = false; + } + } + + if (added) { + // Unnecessary to invoke under synchronized, although it means that some samples + // may be without stack trace (just retry if that happens). + lockAttempt.fillStackTrace(); + } + + return added; + } + + private boolean shouldAdd(LockAttempt lockAttempt) { + LockAttempt existingLockAttempt = byLockPath.get(lockAttempt.getLockPath()); + if (existingLockAttempt != null) { + if (hasLongerDurationThan(lockAttempt, existingLockAttempt)) { + byLockPath.remove(existingLockAttempt.getLockPath()); + priorityQueue.remove(existingLockAttempt); + return true; + } + + return false; + } + + if (size() < maxSamples) { + return true; + } + + // peek() and poll() retrieves the smallest element. + existingLockAttempt = priorityQueue.peek(); // cannot be null + if (hasLongerDurationThan(lockAttempt, existingLockAttempt)) { + priorityQueue.poll(); + byLockPath.remove(existingLockAttempt.getLockPath()); + return true; + } + + return false; + } + + List<LockAttempt> asList() { + synchronized (monitor) { + return List.copyOf(byLockPath.values()); + } + } + + void clear() { + synchronized (monitor) { + byLockPath.clear(); + priorityQueue.clear(); + } + } + + private static boolean hasLongerDurationThan(LockAttempt lockAttempt, LockAttempt otherLockAttempt) { + // Use stable total duration to avoid messing up priority queue. + return lockAttempt.getStableTotalDuration().compareTo(otherLockAttempt.getStableTotalDuration()) > 0; + } +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockCounters.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockCounters.java new file mode 100644 index 00000000000..561ea9a7ed2 --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LockCounters.java @@ -0,0 +1,66 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.stats; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A collection of counters for events related to lock acquisition and release. + * + * @author hakon + */ +public class LockCounters { + final AtomicInteger invokeAcquireCount = new AtomicInteger(0); + final AtomicInteger inCriticalRegionCount = new AtomicInteger(0); + final AtomicInteger acquireFailedCount = new AtomicInteger(0); + final AtomicInteger acquireTimedOutCount = new AtomicInteger(0); + final AtomicInteger lockAcquiredCount = new AtomicInteger(0); + final AtomicInteger locksReleasedCount = new AtomicInteger(0); + + final AtomicInteger noLocksErrorCount = new AtomicInteger(0); + final AtomicInteger lockReleaseErrorCount = new AtomicInteger(0); + + public int invokeAcquireCount() { return invokeAcquireCount.get(); } + public int inCriticalRegionCount() { return inCriticalRegionCount.get(); } + public int acquireFailedCount() { return acquireFailedCount.get(); } + public int acquireTimedOutCount() { return acquireTimedOutCount.get(); } + public int lockAcquiredCount() { return lockAcquiredCount.get(); } + public int locksReleasedCount() { return locksReleasedCount.get(); } + public int noLocksErrorCount() { return noLocksErrorCount.get(); } + public int lockReleaseErrorCount() { return lockReleaseErrorCount.get(); } + + @Override + public String toString() { + return "LockCounters{" + + "invokeAcquireCount=" + invokeAcquireCount + + ", inCriticalRegionCount=" + inCriticalRegionCount + + ", acquireFailedCount=" + acquireFailedCount + + ", acquireTimedOutCount=" + acquireTimedOutCount + + ", lockAcquiredCount=" + lockAcquiredCount + + ", locksReleasedCount=" + locksReleasedCount + + ", noLocksErrorCount=" + noLocksErrorCount + + ", locksReleaseErrorCount=" + lockReleaseErrorCount + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LockCounters that = (LockCounters) o; + return invokeAcquireCount.get() == that.invokeAcquireCount.get() && + inCriticalRegionCount.get() == that.inCriticalRegionCount.get() && + acquireFailedCount.get() == that.acquireFailedCount.get() && + acquireTimedOutCount.get() == that.acquireTimedOutCount.get() && + lockAcquiredCount.get() == that.lockAcquiredCount.get() && + locksReleasedCount.get() == that.locksReleasedCount.get() && + noLocksErrorCount.get() == that.noLocksErrorCount.get() && + lockReleaseErrorCount.get() == that.lockReleaseErrorCount.get(); + } + + @Override + public int hashCode() { + return Objects.hash(invokeAcquireCount, inCriticalRegionCount, acquireFailedCount, acquireTimedOutCount, + lockAcquiredCount, locksReleasedCount, noLocksErrorCount, lockReleaseErrorCount); + } +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java new file mode 100644 index 00000000000..db26523ec37 --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/ThreadLockStats.java @@ -0,0 +1,143 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.stats; + +import com.yahoo.vespa.curator.Lock; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Consumer; + +/** + * This class contains process-wide statistics and information related to acquiring and releasing + * {@link Lock}. Instances of this class contain information tied to a specific thread and lock path. + * + * <p>Instances of this class are thread-safe as long as foreign threads (!= this.thread) avoid mutable methods.</p> + * + * @author hakon + */ +public class ThreadLockStats { + + private static final ConcurrentHashMap<Thread, ThreadLockStats> locks = new ConcurrentHashMap<>(); + + private static final LockAttemptSamples COMPLETED_LOCK_ATTEMPT_SAMPLES = new LockAttemptSamples(); + + private static final ConcurrentHashMap<String, LockCounters> countersByLockPath = new ConcurrentHashMap<>(); + + private final Thread thread; + + /** The locks are reentrant so there may be more than 1 lock for this thread. */ + private final ConcurrentLinkedDeque<LockAttempt> lockAttempts = new ConcurrentLinkedDeque<>(); + + public static Map<String, LockCounters> getLockCountersByPath() { return Map.copyOf(countersByLockPath); } + + public static List<ThreadLockStats> getThreadLockStats() { return List.copyOf(locks.values()); } + + public static List<LockAttempt> getLockAttemptSamples() { + return COMPLETED_LOCK_ATTEMPT_SAMPLES.asList(); + } + + /** Returns the per-thread singleton ThreadLockStats. */ + public static ThreadLockStats getCurrentThreadLockStats() { + return locks.computeIfAbsent(Thread.currentThread(), ThreadLockStats::new); + } + + static void clearStaticDataForTesting() { + locks.clear(); + COMPLETED_LOCK_ATTEMPT_SAMPLES.clear(); + countersByLockPath.clear(); + } + + ThreadLockStats(Thread currentThread) { + this.thread = currentThread; + } + + public String getThreadName() { return thread.getName(); } + + public String getStackTrace() { + var stackTrace = new StringBuilder(); + + StackTraceElement[] elements = thread.getStackTrace(); + for (int i = 0; i < elements.length; ++i) { + var element = elements[i]; + stackTrace.append(element.getClassName()) + .append('.') + .append(element.getMethodName()) + .append('(') + .append(element.getFileName()) + .append(':') + .append(element.getLineNumber()) + .append(")\n"); + } + + return stackTrace.toString(); + } + + public List<LockAttempt> getLockAttempts() { return List.copyOf(lockAttempts); } + + /** Mutable method (see class doc) */ + public void invokingAcquire(String lockPath, Duration timeout) { + LockCounters lockCounters = getLockCounters(lockPath); + lockCounters.invokeAcquireCount.incrementAndGet(); + lockCounters.inCriticalRegionCount.incrementAndGet(); + lockAttempts.addLast(LockAttempt.invokingAcquire(this, lockPath, timeout)); + } + + /** Mutable method (see class doc) */ + public void acquireFailed(String lockPath) { + LockCounters lockCounters = getLockCounters(lockPath); + lockCounters.acquireFailedCount.incrementAndGet(); + removeLastLockAttempt(lockCounters, LockAttempt::acquireFailed); + } + + /** Mutable method (see class doc) */ + public void acquireTimedOut(String lockPath) { + LockCounters lockCounters = getLockCounters(lockPath); + + lockCounters.acquireTimedOutCount.incrementAndGet(); + removeLastLockAttempt(lockCounters, LockAttempt::timedOut); + } + + /** Mutable method (see class doc) */ + public void lockAcquired(String lockPath) { + getLockCounters(lockPath).lockAcquiredCount.incrementAndGet(); + LockAttempt lastLockAttempt = lockAttempts.peekLast(); + if (lastLockAttempt == null) { + throw new IllegalStateException("lockAcquired invoked without lockAttempts"); + } + lastLockAttempt.lockAcquired(); + } + + /** Mutable method (see class doc) */ + public void lockReleased(String lockPath) { + LockCounters lockCounters = getLockCounters(lockPath); + lockCounters.locksReleasedCount.incrementAndGet(); + removeLastLockAttempt(lockCounters, LockAttempt::released); + } + + /** Mutable method (see class doc) */ + public void lockReleaseFailed(String lockPath) { + LockCounters lockCounters = getLockCounters(lockPath); + lockCounters.lockReleaseErrorCount.incrementAndGet(); + removeLastLockAttempt(lockCounters, LockAttempt::releasedWithError); + } + + private LockCounters getLockCounters(String lockPath) { + return countersByLockPath.computeIfAbsent(lockPath, __ -> new LockCounters()); + } + + private void removeLastLockAttempt(LockCounters lockCounters, Consumer<LockAttempt> completeLockAttempt) { + lockCounters.inCriticalRegionCount.decrementAndGet(); + + if (lockAttempts.isEmpty()) { + lockCounters.noLocksErrorCount.incrementAndGet(); + return; + } + + LockAttempt lockAttempt = lockAttempts.pollLast(); + completeLockAttempt.accept(lockAttempt); + COMPLETED_LOCK_ATTEMPT_SAMPLES.maybeSample(lockAttempt); + } +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/package-info.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/package-info.java new file mode 100644 index 00000000000..15a81ffea70 --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/package-info.java @@ -0,0 +1,5 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.vespa.curator.stats; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockAttemptSamplesTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockAttemptSamplesTest.java new file mode 100644 index 00000000000..14dbbad56ba --- /dev/null +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockAttemptSamplesTest.java @@ -0,0 +1,62 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.stats; + +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author hakon + */ +public class LockAttemptSamplesTest { + private final LockAttemptSamples samples = new LockAttemptSamples(2); + private ThreadLockStats threadLockStats; + + @Test + public void test() { + threadLockStats = new ThreadLockStats(Thread.currentThread()); + + assertTrue(maybeSample("1", 10)); + + // new sample has longer duration + assertTrue(maybeSample("1", 11)); + + // new sample has shorter duration + assertFalse(maybeSample("1", 10)); + + // new path, will be added + assertTrue(maybeSample("2", 5)); + + // new path, too low duration be added + assertFalse(maybeSample("3", 4)); + + // new path, expels "2" + assertTrue(maybeSample("4", 6)); + + Map<String, LockAttempt> lockAttempts = samples.asList().stream().collect(Collectors.toMap( + lockAttempt -> lockAttempt.getLockPath(), + lockAttempt -> lockAttempt)); + assertEquals(2, lockAttempts.size()); + + assertTrue(lockAttempts.containsKey("1")); + assertEquals(Duration.ofSeconds(11), lockAttempts.get("1").getStableTotalDuration()); + + assertTrue(lockAttempts.containsKey("4")); + assertEquals(Duration.ofSeconds(6), lockAttempts.get("4").getStableTotalDuration()); + } + + private boolean maybeSample(String lockPath, int secondsDuration) { + LockAttempt lockAttempt = LockAttempt.invokingAcquire(threadLockStats, lockPath, Duration.ofSeconds(1)); + Instant instant = lockAttempt.getTimeAcquiredWasInvoked().plus(Duration.ofSeconds(secondsDuration)); + lockAttempt.setTerminalState(LockAttempt.LockState.RELEASED, instant); + return samples.maybeSample(lockAttempt); + } + +}
\ No newline at end of file diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java new file mode 100644 index 00000000000..92911b0dadf --- /dev/null +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/stats/LockTest.java @@ -0,0 +1,140 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.stats; + +import com.yahoo.vespa.curator.Lock; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author hakon + */ +public class LockTest { + private final InterProcessLock mutex = mock(InterProcessLock.class); + private final String lockPath = "/lock/path"; + private final Duration acquireTimeout = Duration.ofSeconds(10); + private final Lock lock = new Lock(lockPath, mutex); + + @Before + public void setUp() { + ThreadLockStats.clearStaticDataForTesting(); + } + + @Test + public void acquireThrows() throws Exception { + Exception exception = new Exception("example curator exception"); + when(mutex.acquire(anyLong(), any())).thenThrow(exception); + + try { + lock.acquire(acquireTimeout); + fail(); + } catch (Exception e) { + assertSame(e.getCause(), exception); + } + + var expectedCounters = new LockCounters(); + expectedCounters.invokeAcquireCount.set(1); + expectedCounters.acquireFailedCount.set(1); + assertEquals(Map.of(lockPath, expectedCounters), ThreadLockStats.getLockCountersByPath()); + + List<LockAttempt> slowLockAttempts = ThreadLockStats.getLockAttemptSamples(); + assertEquals(1, slowLockAttempts.size()); + LockAttempt slowLockAttempt = slowLockAttempts.get(0); + assertEquals(acquireTimeout, slowLockAttempt.getAcquireTimeout()); + Optional<String> stackTrace = slowLockAttempt.getStackTrace(); + assertTrue(stackTrace.isPresent()); + assertTrue("bad stacktrace: " + stackTrace.get(), stackTrace.get().contains(".Lock.acquire(Lock.java")); + assertEquals(LockAttempt.LockState.ACQUIRE_FAILED, slowLockAttempt.getLockState()); + assertTrue(slowLockAttempt.getTimeTerminalStateWasReached().isPresent()); + + List<ThreadLockStats> threadLockStatsList = ThreadLockStats.getThreadLockStats(); + assertEquals(1, threadLockStatsList.size()); + ThreadLockStats threadLockStats = threadLockStatsList.get(0); + assertEquals(0, threadLockStats.getLockAttempts().size()); + } + + @Test + public void acquireTimesOut() throws Exception { + when(mutex.acquire(anyLong(), any())).thenReturn(false); + + try { + lock.acquire(acquireTimeout); + fail(); + } catch (Exception e) { + assertTrue("unexpected exception: " + e.getMessage(), e.getMessage().contains("Timed out")); + } + + var expectedCounters = new LockCounters(); + expectedCounters.invokeAcquireCount.set(1); + expectedCounters.acquireTimedOutCount.set(1); + assertEquals(Map.of(lockPath, expectedCounters), ThreadLockStats.getLockCountersByPath()); + } + + @Test + public void acquired() throws Exception { + when(mutex.acquire(anyLong(), any())).thenReturn(true); + + lock.acquire(acquireTimeout); + + var expectedCounters = new LockCounters(); + expectedCounters.invokeAcquireCount.set(1); + expectedCounters.lockAcquiredCount.set(1); + expectedCounters.inCriticalRegionCount.set(1); + assertEquals(Map.of(lockPath, expectedCounters), ThreadLockStats.getLockCountersByPath()); + + // reenter + lock.acquire(acquireTimeout); + expectedCounters.invokeAcquireCount.set(2); + expectedCounters.lockAcquiredCount.set(2); + expectedCounters.inCriticalRegionCount.set(2); + + // inner-most closes + lock.close(); + expectedCounters.inCriticalRegionCount.set(1); + expectedCounters.locksReleasedCount.set(1); + assertEquals(Map.of(lockPath, expectedCounters), ThreadLockStats.getLockCountersByPath()); + + // outer-most closes + lock.close(); + expectedCounters.inCriticalRegionCount.set(0); + expectedCounters.locksReleasedCount.set(2); + assertEquals(Map.of(lockPath, expectedCounters), ThreadLockStats.getLockCountersByPath()); + } + + @Test + public void nestedLocks() throws Exception { + when(mutex.acquire(anyLong(), any())).thenReturn(true); + + String lockPath2 = "/lock/path/2"; + Lock lock2 = new Lock(lockPath2, mutex); + + lock.acquire(acquireTimeout); + lock2.acquire(acquireTimeout); + + List<ThreadLockStats> threadLockStats = ThreadLockStats.getThreadLockStats(); + assertEquals(1, threadLockStats.size()); + List<LockAttempt> lockAttempts = threadLockStats.get(0).getLockAttempts(); + assertEquals(2, lockAttempts.size()); + assertEquals(lockPath, lockAttempts.get(0).getLockPath()); + assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(0).getLockState()); + assertEquals(lockPath2, lockAttempts.get(1).getLockPath()); + assertEquals(LockAttempt.LockState.ACQUIRED, lockAttempts.get(1).getLockState()); + + lock.close(); + lock.close(); + } +} |