// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.maintenance; import com.yahoo.component.Version; import com.yahoo.component.Vtag; import com.yahoo.concurrent.UncheckedTimeoutException; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ClusterMembership; import com.yahoo.config.provision.ClusterSpec; import com.yahoo.config.provision.NodeAllocationException; import com.yahoo.config.provision.NodeResources; import com.yahoo.config.provision.NodeType; import com.yahoo.jdisc.Metric; import com.yahoo.lang.MutableInteger; import com.yahoo.transaction.Mutex; import com.yahoo.vespa.flags.FlagSource; import com.yahoo.vespa.flags.ListFlag; import com.yahoo.vespa.flags.PermanentFlags; import com.yahoo.vespa.flags.custom.ClusterCapacity; import com.yahoo.vespa.hosted.provision.LockedNodeList; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeList; import com.yahoo.vespa.hosted.provision.NodeMutex; import com.yahoo.vespa.hosted.provision.NodeRepository; import com.yahoo.vespa.hosted.provision.node.Agent; import com.yahoo.vespa.hosted.provision.node.History; import com.yahoo.vespa.hosted.provision.node.IP; import com.yahoo.vespa.hosted.provision.provisioning.HostProvisionRequest; import com.yahoo.vespa.hosted.provision.provisioning.HostProvisioner; import com.yahoo.vespa.hosted.provision.provisioning.HostProvisioner.HostSharing; import com.yahoo.vespa.hosted.provision.provisioning.NodeCandidate; import com.yahoo.vespa.hosted.provision.provisioning.NodePrioritizer; import com.yahoo.vespa.hosted.provision.provisioning.NodeSpec; import com.yahoo.vespa.hosted.provision.provisioning.ProvisioningThrottler; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import static java.util.Comparator.comparing; import static java.util.Comparator.naturalOrder; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toSet; /** * @author freva * @author mpolden */ public class HostCapacityMaintainer extends NodeRepositoryMaintainer { private static final Logger log = Logger.getLogger(HostCapacityMaintainer.class.getName()); private final HostProvisioner hostProvisioner; private final ListFlag preprovisionCapacityFlag; private final ProvisioningThrottler throttler; HostCapacityMaintainer(NodeRepository nodeRepository, Duration interval, HostProvisioner hostProvisioner, FlagSource flagSource, Metric metric) { super(nodeRepository, interval, metric); this.hostProvisioner = hostProvisioner; this.preprovisionCapacityFlag = PermanentFlags.PREPROVISION_CAPACITY.bindTo(flagSource); this.throttler = new ProvisioningThrottler(nodeRepository, metric); } @Override protected double maintain() { List provisionedSnapshot; try { NodeList nodes; // Host and child nodes are written in separate transactions, but both are written while holding the // unallocated lock. Hold the unallocated lock while reading nodes to ensure we get all the children // of newly provisioned hosts. try (Mutex ignored = nodeRepository().nodes().lockUnallocated()) { nodes = nodeRepository().nodes().list(); } provisionedSnapshot = provision(nodes); } catch (NodeAllocationException | IllegalStateException e) { log.log(Level.WARNING, "Failed to allocate preprovisioned capacity and/or find excess hosts: " + e.getMessage()); return 0; // avoid removing excess hosts } catch (RuntimeException e) { log.log(Level.WARNING, "Failed to allocate preprovisioned capacity and/or find excess hosts", e); return 0; // avoid removing excess hosts } return markForRemoval(provisionedSnapshot); } private double markForRemoval(List provisionedSnapshot) { List emptyHosts = findEmptyOrRemovableHosts(provisionedSnapshot); if (emptyHosts.isEmpty()) return 1; int attempts = 0, success = 0; for (Set typeEmptyHosts : emptyHosts.stream().collect(groupingBy(Node::type, toSet())).values()) { attempts++; // All nodes in the list are hosts of the same type, so they use the same lock regardless of their allocation Optional appMutex = nodeRepository().nodes().lockAndGet(typeEmptyHosts.iterator().next(), Duration.ofSeconds(10)); if (appMutex.isEmpty()) continue; try (Mutex lock = appMutex.get(); Mutex unallocatedLock = nodeRepository().nodes().lockUnallocated()) { // Re-read all nodes under lock and compute the candidates for removal. The actual nodes we want // to mark for removal is the intersection with typeEmptyHosts, which excludes the preprovisioned hosts. Map, List> currentNodesByParent = nodeRepository().nodes().list().stream().collect(groupingBy(Node::parentHostname)); List candidateHosts = new ArrayList<>(getHosts(currentNodesByParent)); candidateHosts.retainAll(typeEmptyHosts); // Deprovision newly provisioned hosts before older to reduce churn candidateHosts.sort(Comparator.comparing((Node node) -> node.history().event(History.Event.Type.provisioned).map(History.Event::at).orElse(Instant.now())) .reversed()); for (Node host : candidateHosts) { attempts++; // Any hosts that are no longer empty should be marked as such, and excluded from removal. if (currentNodesByParent.getOrDefault(Optional.of(host.hostname()), List.of()).stream().anyMatch(n -> ! canDeprovision(n)) && host.hostEmptyAt().isPresent()) { nodeRepository().nodes().write(host.withHostEmptyAt(null), lock); } // If the host is still empty, we can mark it as empty now, or mark it for removal if it has already expired. else { Instant now = clock().instant(); Node emptyHost = host.hostEmptyAt().isPresent() ? host : host.withHostEmptyAt(now); boolean expired = ! now.isBefore(emptyHost.hostEmptyAt().get().plus(host.hostTTL().orElse(Duration.ZERO))); if (expired && canRemoveHost(emptyHost)) { // Retire the host to parked if possible, otherwise move it straight to parked. if (EnumSet.of(Node.State.reserved, Node.State.active, Node.State.inactive).contains(host.state())) { emptyHost = emptyHost.withWantToRetire(true, true, Agent.HostCapacityMaintainer, now); nodeRepository().nodes().write(emptyHost, lock); } else { if (emptyHost != host) nodeRepository().nodes().write(emptyHost, lock); nodeRepository().nodes().park(host.hostname(), true, Agent.HostCapacityMaintainer, "Parked for removal"); } } else { if (emptyHost != host) nodeRepository().nodes().write(emptyHost, lock); } } success++; } } catch (UncheckedTimeoutException e) { log.log(Level.WARNING, "Failed to mark excess hosts for deprovisioning: Failed to get lock, will retry later"); } success++; } return asSuccessFactorDeviation(attempts, attempts - success); } private List provision(NodeList nodeList) { return provisionUntilNoDeficit(nodeList).stream() .sorted(comparing(node -> node.history().events().stream() .map(History.Event::at) .min(naturalOrder()) .orElse(Instant.MIN))) .toList(); } private static boolean canRemoveHost(Node host) { return switch (host.type()) { // TODO: Mark empty tenant hosts as wanttoretire & wanttodeprovision elsewhere, then handle as confighost here case host -> host.state() != Node.State.deprovisioned && (host.state() != Node.State.parked || host.status().wantToDeprovision()); case confighost, controllerhost -> canDeprovision(host); default -> false; }; } static boolean canDeprovision(Node node) { return node.status().wantToDeprovision() && (node.state() == Node.State.parked || node.state() == Node.State.failed); } /** * @return the nodes in {@code nodeList} plus all hosts provisioned, plus all preprovision capacity * nodes that were allocated. * @throws NodeAllocationException if there were problems provisioning hosts, and in case message * should be sufficient (avoid no stack trace) * @throws IllegalStateException if there was an algorithmic problem, and in case message * should be sufficient (avoid no stack trace). */ private List provisionUntilNoDeficit(NodeList nodeList) { List preprovisionCapacity = preprovisionCapacityFlag.value(); ApplicationId application = ApplicationId.defaultId(); // Worst-case each ClusterCapacity in preprovisionCapacity will require an allocation. int maxProvisions = preprovisionCapacity.size(); var nodesPlusProvisioned = new ArrayList<>(nodeList.asList()); for (int numProvisions = 0;; ++numProvisions) { var nodesPlusProvisionedPlusAllocated = new ArrayList<>(nodesPlusProvisioned); Optional deficit = allocatePreprovisionCapacity(application, preprovisionCapacity, nodesPlusProvisionedPlusAllocated); if (deficit.isEmpty()) { return nodesPlusProvisionedPlusAllocated; } if (numProvisions >= maxProvisions) { throw new IllegalStateException("Have provisioned " + numProvisions + " times but there's still deficit: aborting"); } ClusterCapacity clusterCapacityDeficit = deficit.get(); nodesPlusProvisioned.addAll(provisionHosts(clusterCapacityDeficit.count(), toNodeResources(clusterCapacityDeficit), Optional.ofNullable(clusterCapacityDeficit.clusterType()), nodeList)); } } private List provisionHosts(int count, NodeResources nodeResources, Optional clusterType, NodeList allNodes) { try { if (throttler.throttle(allNodes, Agent.HostCapacityMaintainer)) { throw new NodeAllocationException("Host provisioning is being throttled", true); } Version osVersion = nodeRepository().osVersions().targetFor(NodeType.host).orElse(Version.emptyVersion); List provisionIndices = nodeRepository().database().readProvisionIndices(count); HostSharing sharingMode = nodeRepository().exclusiveAllocation(asSpec(clusterType, 0)) ? HostSharing.exclusive : HostSharing.shared; HostProvisionRequest request = new HostProvisionRequest(provisionIndices, NodeType.host, nodeResources, ApplicationId.defaultId(), osVersion, sharingMode, clusterType.map(ClusterSpec.Type::valueOf), Optional.empty(), nodeRepository().zone().cloud().account(), false); List hosts = new ArrayList<>(); Runnable waiter; try (var lock = nodeRepository().nodes().lockUnallocated()) { waiter = hostProvisioner.provisionHosts(request, resources -> true, provisionedHosts -> { hosts.addAll(provisionedHosts.stream() .map(host -> host.generateHost(Duration.ZERO)) .map(host -> host.withExclusiveToApplicationId(null)) .toList()); nodeRepository().nodes().addNodes(hosts, Agent.HostCapacityMaintainer); }); } waiter.run(); return hosts; } catch (NodeAllocationException | IllegalArgumentException | IllegalStateException e) { throw new NodeAllocationException("Failed to provision " + count + " " + nodeResources + ": " + e.getMessage(), ! (e instanceof NodeAllocationException nae) || nae.retryable()); } catch (RuntimeException e) { throw new RuntimeException("Failed to provision " + count + " " + nodeResources + ", will retry in " + interval(), e); } } /** * Try to allocate the preprovision cluster capacity. * * @param mutableNodes represents all nodes in the node repo. As preprovision capacity is virtually allocated * they are added to {@code mutableNodes} * @return the part of a cluster capacity it was unable to allocate, if any */ private Optional allocatePreprovisionCapacity(ApplicationId application, List preprovisionCapacity, ArrayList mutableNodes) { for (int clusterIndex = 0; clusterIndex < preprovisionCapacity.size(); ++clusterIndex) { ClusterCapacity clusterCapacity = preprovisionCapacity.get(clusterIndex); LockedNodeList allNodes = new LockedNodeList(mutableNodes, () -> {}); List candidates = findCandidates(application, clusterCapacity, clusterIndex, allNodes); int deficit = Math.max(0, clusterCapacity.count() - candidates.size()); if (deficit > 0) { return Optional.of(clusterCapacity.withCount(deficit)); } // Simulate allocating the cluster mutableNodes.addAll(candidates); } return Optional.empty(); } private List findCandidates(ApplicationId application, ClusterCapacity clusterCapacity, int clusterIndex, LockedNodeList allNodes) { NodeResources nodeResources = toNodeResources(clusterCapacity); // We'll allocate each ClusterCapacity as a unique cluster in a dummy application ClusterSpec cluster = asSpec(Optional.ofNullable(clusterCapacity.clusterType()), clusterIndex); NodeSpec nodeSpec = NodeSpec.from(clusterCapacity.count(), 1, nodeResources, false, true, nodeRepository().zone().cloud().account(), Duration.ZERO); var allocationContext = IP.Allocation.Context.from(nodeRepository().zone().cloud().name(), nodeSpec.cloudAccount().isExclave(nodeRepository().zone()), nodeRepository().nameResolver()); NodePrioritizer prioritizer = new NodePrioritizer(allNodes, application, cluster, nodeSpec, true, false, allocationContext, nodeRepository().nodes(), nodeRepository().resourcesCalculator(), nodeRepository().spareCount(), nodeRepository().exclusiveAllocation(cluster)); List nodeCandidates = prioritizer.collect() .stream() .filter(node -> node.violatesExclusivity(cluster, application, nodeRepository().exclusiveClusterType(cluster), nodeRepository().exclusiveAllocation(cluster), false, nodeRepository().zone().cloud().allowHostSharing(), allNodes) != NodeCandidate.ExclusivityViolation.YES) .toList(); MutableInteger index = new MutableInteger(0); return nodeCandidates .stream() .limit(clusterCapacity.count()) .map(candidate -> candidate.toNode() .allocate(application, ClusterMembership.from(cluster, index.next()), nodeResources, nodeRepository().clock().instant())) .toList(); } private static ClusterSpec asSpec(Optional clusterType, int index) { return ClusterSpec.request(clusterType.map(ClusterSpec.Type::from).orElse(ClusterSpec.Type.content), ClusterSpec.Id.from(String.valueOf(index))) .vespaVersion(Vtag.currentVersion) // Needed, but should not be used here. .build(); } private static NodeResources toNodeResources(ClusterCapacity clusterCapacity) { return new NodeResources(clusterCapacity.vcpu(), clusterCapacity.memoryGb(), clusterCapacity.diskGb(), clusterCapacity.bandwidthGbps(), NodeResources.DiskSpeed.valueOf(clusterCapacity.diskSpeed()), NodeResources.StorageType.valueOf(clusterCapacity.storageType()), NodeResources.Architecture.valueOf(clusterCapacity.architecture())); } private static List findEmptyOrRemovableHosts(List provisionedSnapshot) { // Group nodes by parent; no parent means it's a host. var nodesByParent = provisionedSnapshot.stream().collect(groupingBy(Node::parentHostname)); // Find all hosts that we once thought were empty (first clause), or whose children are now all removable (second clause). return getHosts(nodesByParent).stream() .filter(host -> host.hostEmptyAt().isPresent() || allChildrenCanBeDeprovisioned(nodesByParent, host)) .toList(); } private static List getHosts(Map, List> nodesByParent) { return nodesByParent.get(Optional.empty()); } private static List getChildren(Map, List> nodesByParent, Node host) { return nodesByParent.getOrDefault(Optional.of(host.hostname()), List.of()); } private static boolean allChildrenCanBeDeprovisioned(Map, List> nodesByParent, Node host) { return getChildren(nodesByParent, host).stream().allMatch(HostCapacityMaintainer::canDeprovision); } }