diff options
author | jonmv <venstad@gmail.com> | 2023-10-18 16:35:17 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-10-19 08:58:57 +0200 |
commit | 376c397211bda8f201d518b399f0e36bfdc17adf (patch) | |
tree | 9ffe318b106d2f5ae1836b9fc5b59cab7b20c208 /node-repository | |
parent | 58106d4418286685787c66603358e6e9fa34263a (diff) |
Add reactive infra-application redeployer, triggered when hosts complete provisioning
Diffstat (limited to 'node-repository')
7 files changed, 307 insertions, 12 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainer.java index f260832ef32..b2dde608ed2 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostCapacityMaintainer.java @@ -7,7 +7,6 @@ import com.yahoo.concurrent.UncheckedTimeoutException; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ClusterMembership; import com.yahoo.config.provision.ClusterSpec; -import com.yahoo.config.provision.ClusterSpec.Type; import com.yahoo.config.provision.NodeAllocationException; import com.yahoo.config.provision.NodeResources; import com.yahoo.config.provision.NodeType; diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/InfraApplicationRedeployer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/InfraApplicationRedeployer.java new file mode 100644 index 00000000000..d42c754c108 --- /dev/null +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/InfraApplicationRedeployer.java @@ -0,0 +1,117 @@ +package com.yahoo.vespa.hosted.provision.maintenance; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.component.annotation.Inject; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.concurrent.UncheckedTimeoutException; +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.Deployment; +import com.yahoo.config.provision.InfraDeployer; +import com.yahoo.config.provision.NodeType; +import com.yahoo.transaction.Mutex; +import com.yahoo.vespa.applicationmodel.InfrastructureApplication; +import com.yahoo.vespa.hosted.provision.Node.State; +import com.yahoo.vespa.hosted.provision.NodeList; +import com.yahoo.vespa.hosted.provision.NodeRepository; +import com.yahoo.vespa.service.duper.TenantHostApplication; + +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.logging.Logger; + +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; + +/** + * Performs on-demand redeployment of the {@link InfrastructureApplication}s, to minimise time between + * host provisioning for a deployment completing, and deployment of its application containers succeeding. + * + * @author jonmv + */ +public class InfraApplicationRedeployer extends AbstractComponent { + + private static final Logger log = Logger.getLogger(InfraApplicationRedeployer.class.getName()); + + private final ExecutorService executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("infra-application-redeployer-")); + private final Set<InfrastructureApplication> readiedTypes = new ConcurrentSkipListSet<>(); + private final InfraDeployer deployer; + private final Function<ApplicationId, Mutex> locks; + private final Supplier<NodeList> nodes; + + @Inject + public InfraApplicationRedeployer(InfraDeployer deployer, NodeRepository nodes) { + this(deployer, nodes.applications()::lockMaintenance, nodes.nodes()::list); + } + + InfraApplicationRedeployer(InfraDeployer deployer, Function<ApplicationId, Mutex> locks, Supplier<NodeList> nodes) { + this.deployer = deployer; + this.locks = locks; + this.nodes = nodes; + } + + public void readied(NodeType type) { + readied(applicationOf(type)); + } + + private void readied(InfrastructureApplication application) { + if (application == null) return; + if (readiedTypes.add(application)) executor.execute(() -> checkAndRedeploy(application)); + } + + private void checkAndRedeploy(InfrastructureApplication application) { + log.log(INFO, () -> "Checking if " + application.name() + " should be redeployed"); + if ( ! readiedTypes.remove(application)) return; + log.log(INFO, () -> "Trying to redeploy " + application.id() + " after completing provisioning of " + application.name()); + try (Mutex lock = locks.apply(application.id())) { + if (application.nodeType().isHost() && nodes.get().state(State.ready).nodeType(application.nodeType()).isEmpty()) return; + log.log(INFO, () -> "Redeploying " + application.id() + " after completing provisioning of " + application.name()); + try { + deployer.getDeployment(application.id()).ifPresent(Deployment::activate); + if (childOf(application) != null) readied(childOf(application)); + } + catch (RuntimeException e) { + log.log(WARNING, "Failed redeploying " + application.id() + ", will be retried by maintainer", e); + } + } + catch (UncheckedTimeoutException collision) { + readied(application); + } + } + + private static InfrastructureApplication applicationOf(NodeType type) { + return switch (type) { + case host -> InfrastructureApplication.TENANT_HOST; + case confighost -> InfrastructureApplication.CONFIG_SERVER_HOST; + case controllerhost -> InfrastructureApplication.CONTROLLER_HOST; + case proxyhost -> InfrastructureApplication.PROXY_HOST; + default -> null; + }; + } + + private static InfrastructureApplication childOf(InfrastructureApplication application) { + return switch (application) { + case CONFIG_SERVER_HOST -> InfrastructureApplication.CONFIG_SERVER; + case CONTROLLER_HOST -> InfrastructureApplication.CONTROLLER; + default -> null; + }; + } + + @Override + public void deconstruct() { + executor.shutdown(); + try { + if (executor.awaitTermination(10, TimeUnit.SECONDS)) return; + log.log(WARNING, "Redeployer did not shut down within 10 seconds"); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + executor.shutdownNow(); + } + +} diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Nodes.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Nodes.java index 26c99501d04..38c1306a08a 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Nodes.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/node/Nodes.java @@ -467,11 +467,12 @@ public class Nodes { } } - /* - * This method is used by the REST API to handle readying nodes for new allocations. For Linux - * containers this will remove the node from node repository, otherwise the node will be moved to state ready. + /** + * This method is used by the REST API to handle readying nodes for new allocations. + * Tenant containers will be removed, while other nodes will be moved to the ready state. + * Returns true if a node was updated, or false if the node was removed, or already was ready. */ - public Node markNodeAvailableForNewAllocation(String hostname, Agent agent, String reason) { + public boolean markNodeAvailableForNewAllocation(String hostname, Agent agent, String reason) { try (NodeMutex nodeMutex = lockAndGetRequired(hostname)) { Node node = nodeMutex.node(); if (node.type() == NodeType.tenant) { @@ -481,17 +482,18 @@ public class Nodes { NestedTransaction transaction = new NestedTransaction(); db.removeNodes(List.of(node), transaction); transaction.commit(); - return node; + return false; } - if (node.state() == Node.State.ready) return node; + if (node.state() == Node.State.ready) return false; Node parentHost = node.parentHostname().flatMap(this::node).orElse(node); List<String> failureReasons = NodeFailer.reasonsToFailHost(parentHost); if (!failureReasons.isEmpty()) illegal(node + " cannot be readied because it has hard failures: " + failureReasons); - return setReady(nodeMutex, agent, reason); + setReady(nodeMutex, agent, reason); + return true; } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/InfraDeployerImpl.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/InfraDeployerImpl.java index 9cc1f2e05ef..39c14be4d2b 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/InfraDeployerImpl.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/InfraDeployerImpl.java @@ -13,7 +13,6 @@ import com.yahoo.config.provision.HostSpec; import com.yahoo.config.provision.InfraDeployer; import com.yahoo.config.provision.NodeType; import com.yahoo.config.provision.Provisioner; -import com.yahoo.transaction.Mutex; import com.yahoo.transaction.NestedTransaction; import com.yahoo.vespa.hosted.provision.NodeRepository; import com.yahoo.vespa.hosted.provision.maintenance.InfrastructureVersions; @@ -60,7 +59,7 @@ public class InfraDeployerImpl implements InfraDeployer { .forEach(api -> { var application = api.getApplicationId(); var deployment = new InfraDeployment(api); - try { + try (var lock = nodeRepository.applications().lockMaintenance(application)) { deployment.activate(); } catch (RuntimeException e) { logger.log(Level.INFO, "Failed to activate " + application, e); diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java index 1ed138625ae..52d4cdf792b 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/NodesV2ApiHandler.java @@ -44,6 +44,7 @@ import com.yahoo.vespa.hosted.provision.node.filter.NodeHostFilter; import com.yahoo.vespa.hosted.provision.node.filter.NodeOsVersionFilter; import com.yahoo.vespa.hosted.provision.node.filter.NodeTypeFilter; import com.yahoo.vespa.hosted.provision.node.filter.ParentHostFilter; +import com.yahoo.vespa.hosted.provision.maintenance.InfraApplicationRedeployer; import com.yahoo.vespa.hosted.provision.restapi.NodesResponse.ResponseType; import com.yahoo.vespa.orchestrator.Orchestrator; import com.yahoo.yolean.Exceptions; @@ -75,13 +76,16 @@ public class NodesV2ApiHandler extends ThreadedHttpRequestHandler { private final Orchestrator orchestrator; private final NodeRepository nodeRepository; private final NodeFlavors nodeFlavors; + private final InfraApplicationRedeployer infraApplicationRedeployer; @Inject - public NodesV2ApiHandler(Context parentCtx, Orchestrator orchestrator, NodeRepository nodeRepository, NodeFlavors flavors) { + public NodesV2ApiHandler(Context parentCtx, Orchestrator orchestrator, NodeRepository nodeRepository, + NodeFlavors flavors, InfraApplicationRedeployer infraApplicationRedeployer) { super(parentCtx); this.orchestrator = orchestrator; this.nodeRepository = nodeRepository; this.nodeFlavors = flavors; + this.infraApplicationRedeployer = infraApplicationRedeployer; } @Override @@ -138,7 +142,8 @@ public class NodesV2ApiHandler extends ThreadedHttpRequestHandler { Path path = new Path(request.getUri()); // Check paths to disallow illegal state changes if (path.matches("/nodes/v2/state/ready/{hostname}")) { - nodeRepository.nodes().markNodeAvailableForNewAllocation(path.get("hostname"), agent(request), "Readied through the nodes/v2 API"); + if (nodeRepository.nodes().markNodeAvailableForNewAllocation(path.get("hostname"), agent(request), "Readied through the nodes/v2 API")) + infraApplicationRedeployer.readied(nodeRepository.nodes().node(path.get("hostname")).get().type()); return new MessageResponse("Moved " + path.get("hostname") + " to " + Node.State.ready); } else if (path.matches("/nodes/v2/state/failed/{hostname}")) { diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/testutils/ContainerConfig.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/testutils/ContainerConfig.java index f653416d973..8039d1450d9 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/testutils/ContainerConfig.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/testutils/ContainerConfig.java @@ -38,6 +38,7 @@ public class ContainerConfig { <component id='com.yahoo.vespa.hosted.provision.testutils.MockMetricsFetcher'/> <component id='com.yahoo.vespa.hosted.provision.testutils.MockNodeRepository'/> <component id='com.yahoo.vespa.hosted.provision.testutils.MockProvisionServiceProvider'/> + <component id='com.yahoo.vespa.hosted.provision.maintenance.InfraApplicationRedeployer'/> <component id='com.yahoo.vespa.hosted.provision.maintenance.NodeRepositoryMaintenance'/> <component id='com.yahoo.vespa.flags.InMemoryFlagSource'/> <component id='com.yahoo.config.provision.Zone'/> diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/InfraApplicationRedeployerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/InfraApplicationRedeployerTest.java new file mode 100644 index 00000000000..bd8607ef327 --- /dev/null +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/InfraApplicationRedeployerTest.java @@ -0,0 +1,172 @@ +package com.yahoo.vespa.hosted.provision.maintenance; + +import com.yahoo.concurrent.UncheckedTimeoutException; +import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.Deployment; +import com.yahoo.config.provision.Flavor; +import com.yahoo.config.provision.InfraDeployer; +import com.yahoo.config.provision.NodeResources; +import com.yahoo.config.provision.NodeType; +import com.yahoo.transaction.Mutex; +import com.yahoo.vespa.applicationmodel.InfrastructureApplication; +import com.yahoo.vespa.hosted.provision.Node; +import com.yahoo.vespa.hosted.provision.Node.State; +import com.yahoo.vespa.hosted.provision.NodeList; +import com.yahoo.vespa.hosted.provision.node.IP; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * @author jonmv + */ +class InfraApplicationRedeployerTest { + + private static final ApplicationId cfghost = InfrastructureApplication.CONFIG_SERVER_HOST.id(); + private static final ApplicationId cfg = InfrastructureApplication.CONFIG_SERVER.id(); + private static final ApplicationId tenanthost = InfrastructureApplication.TENANT_HOST.id(); + + @Test + void testMultiTriggering() throws InterruptedException { + TestLocks locks = new TestLocks(); + List<Node> nodes = new CopyOnWriteArrayList<>(); + TestInfraDeployer deployer = new TestInfraDeployer(); + InfraApplicationRedeployer redeployer = new InfraApplicationRedeployer(deployer, locks::get, () -> NodeList.copyOf(nodes)); + Phaser intro = new Phaser(2); + CountDownLatch intermezzo = new CountDownLatch(1), outro = new CountDownLatch(1); + + // First run does nothing, as no nodes are ready after all, but several new runs are triggered as this ends. + locks.expect(tenanthost, () -> () -> { intro.arriveAndAwaitAdvance(); intro.arriveAndAwaitAdvance(); }); + redeployer.readied(NodeType.host); + intro.arriveAndAwaitAdvance(); // Wait for redeployer to start, before setting up more state. + // Before re-triggered events from first tenanthost run, we also trigger for confighost, which should then run before those. + locks.expect(cfghost, () -> () -> { }); + redeployer.readied(NodeType.confighost); + for (int i = 0; i < 10000; i++) redeployer.readied(NodeType.host); + nodes.add(node("host", NodeType.host, State.ready)); + // Re-run for tenanthost clears host from ready, and next run does nothing. + deployer.expect(tenanthost, () -> { + nodes.clear(); + return Optional.empty(); + }); + locks.expect(tenanthost, () -> intermezzo::countDown); + intro.arriveAndAwaitAdvance(); // Let redeployer continue. + intermezzo.await(10, TimeUnit.SECONDS); // Rendezvous with last, no-op tenanthost redeployment. + locks.verify(); + deployer.verify(); + + // Confighost is triggered again with one ready host. Both applications deploy, and a new trigger redeploys neither. + locks.expect(cfghost, () -> () -> { }); + locks.expect(cfg, () -> () -> { }); + nodes.add(node("cfghost", NodeType.confighost, State.ready)); + deployer.expect(cfghost, () -> { + nodes.clear(); + return Optional.empty(); + }); + deployer.expect(cfg, () -> { + redeployer.readied(NodeType.confighost); + return Optional.empty(); + }); + locks.expect(cfghost, () -> outro::countDown); + redeployer.readied(NodeType.confighost); + + outro.await(10, TimeUnit.SECONDS); + redeployer.deconstruct(); + locks.verify(); + deployer.verify(); + } + + @Test + void testRetries() throws InterruptedException { + TestLocks locks = new TestLocks(); + List<Node> nodes = new CopyOnWriteArrayList<>(); + TestInfraDeployer deployer = new TestInfraDeployer(); + InfraApplicationRedeployer redeployer = new InfraApplicationRedeployer(deployer, locks::get, () -> NodeList.copyOf(nodes)); + + // Does nothing. + redeployer.readied(NodeType.tenant); + + // Getting lock fails with runtime exception; no deployments, no retries. + locks.expect(tenanthost, () -> { throw new RuntimeException("Failed"); }); + redeployer.readied(NodeType.host); + + // Getting lock times out for configserver application; deployment of configserverapp is retried, but host is done. + CountDownLatch latch = new CountDownLatch(1); + locks.expect(cfghost, () -> () -> { }); + locks.expect(cfg, () -> { throw new UncheckedTimeoutException("Timeout"); }); + locks.expect(cfg, () -> latch::countDown); + nodes.add(node("cfghost", NodeType.confighost, State.ready)); + deployer.expect(cfghost, () -> { + nodes.set(0, node("cfghost", NodeType.confighost, State.active)); + return Optional.empty(); + }); + deployer.expect(cfg, Optional::empty); + redeployer.readied(NodeType.confighost); + latch.await(10, TimeUnit.SECONDS); + redeployer.deconstruct(); + locks.verify(); + deployer.verify(); + } + + private static Node node(String name, NodeType type, State state) { + return Node.create(name, name, new Flavor(NodeResources.unspecified()), state, type) + .ipConfig(IP.Config.of(List.of("1.2.3.4"), List.of("1.2.3.4"))) + .build(); + } + + private static class Expectations<T, R> { + + final Queue<T> expected = new ConcurrentLinkedQueue<>(); + final Queue<Throwable> stacks = new ConcurrentLinkedQueue<>(); + final Queue<Supplier<R>> reactions = new ConcurrentLinkedQueue<>(); + final AtomicReference<Throwable> failure = new AtomicReference<>(); + + void expect(T id, Supplier<R> reaction) { + expected.add(id); + stacks.add(new AssertionError("Failed expectation of " + id)); + reactions.add(reaction); + } + + R get(T id) { + Throwable s = stacks.poll(); + if (s == null) s = new AssertionError("Unexpected invocation with " + id); + try { assertEquals(expected.poll(), id); } + catch (Throwable t) { + StackTraceElement[] trace = t.getStackTrace(); + t.setStackTrace(s.getStackTrace()); + s.setStackTrace(trace); + t.addSuppressed(s); + if ( ! failure.compareAndSet(null, t)) failure.get().addSuppressed(t); + throw t; + } + return reactions.poll().get(); + } + + @SuppressWarnings("unchecked") + <E extends Throwable> void verify() throws E { + if (failure.get() != null) throw (E) failure.get(); + assertEquals(List.of(), List.copyOf(expected)); + } + + } + + private static class TestLocks extends Expectations<ApplicationId, Mutex> { } + + private static class TestInfraDeployer extends Expectations<ApplicationId, Optional<Deployment>> implements InfraDeployer { + @Override public Optional<Deployment> getDeployment(ApplicationId application) { return get(application); } + @Override public void activateAllSupportedInfraApplications(boolean propagateException) { fail(); } + } + +} |