diff options
author | Valerij Fredriksen <valerijf@oath.com> | 2019-01-09 15:13:24 +0100 |
---|---|---|
committer | Valerij Fredriksen <valerijf@oath.com> | 2019-01-09 18:50:56 +0100 |
commit | 0cf7ec5d521e1595289219fadf98db514b353c1c (patch) | |
tree | 9460c2972f24d6d78b9e7b8a27b8b7e360dfc7bd /node-admin/src | |
parent | f724bcfa584c590ad079cd2614aa21acc835e285 (diff) |
Use NodeAgentContextSupplier in NodeAgent
Diffstat (limited to 'node-admin/src')
3 files changed, 159 insertions, 260 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java index 947e7c85d66..10076c4f48a 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java @@ -9,12 +9,6 @@ package com.yahoo.vespa.hosted.node.admin.nodeagent; * @author bakksjo */ public interface NodeAgent { - /** - * Will eventually freeze/unfreeze the node agent - * @param frozen whether node agent should be frozen - * @return True if node agent has converged to the desired state - */ - boolean setFrozen(boolean frozen); /** * Stop services running on node. Depending on the state of the node, {@link #suspend()} might need to be diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java index 98975dddb56..ce1acea7d39 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java @@ -17,20 +17,17 @@ import com.yahoo.vespa.hosted.dockerapi.metrics.Dimensions; import com.yahoo.vespa.hosted.dockerapi.metrics.MetricReceiverWrapper; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeAttributes; +import com.yahoo.vespa.hosted.node.admin.configserver.orchestrator.OrchestratorException; import com.yahoo.vespa.hosted.node.admin.docker.DockerOperations; import com.yahoo.vespa.hosted.node.admin.maintenance.StorageMaintainer; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeRepository; import com.yahoo.vespa.hosted.node.admin.configserver.orchestrator.Orchestrator; -import com.yahoo.vespa.hosted.node.admin.configserver.orchestrator.OrchestratorException; import com.yahoo.vespa.hosted.node.admin.maintenance.acl.AclMaintainer; import com.yahoo.vespa.hosted.node.admin.maintenance.identity.AthenzCredentialsMaintainer; import com.yahoo.vespa.hosted.node.admin.nodeadmin.ConvergenceException; import com.yahoo.vespa.hosted.node.admin.util.SecretAgentCheckConfig; import com.yahoo.vespa.hosted.provision.Node; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -59,30 +56,21 @@ public class NodeAgentImpl implements NodeAgent { private static final Logger logger = Logger.getLogger(NodeAgentImpl.class.getName()); - private final Object monitor = new Object(); private final AtomicBoolean terminated = new AtomicBoolean(false); - - private boolean isFrozen = true; - private boolean wantFrozen = false; - private boolean workToDoNow = true; - private boolean expectNodeNotInNodeRepo = false; private boolean hasResumedNode = false; private boolean hasStartedServices = true; - private final NodeAgentContext context; + private final NodeAgentContextSupplier contextSupplier; private final NodeRepository nodeRepository; private final Orchestrator orchestrator; private final DockerOperations dockerOperations; private final StorageMaintainer storageMaintainer; - private final Clock clock; - private final Duration timeBetweenEachConverge; private final Optional<AthenzCredentialsMaintainer> athenzCredentialsMaintainer; private final Optional<AclMaintainer> aclMaintainer; private final Optional<HealthChecker> healthChecker; private int numberOfUnhandledException = 0; private DockerImage imageBeingDownloaded = null; - private Instant lastConverge; private long currentRebootGeneration = 0; private Optional<Long> currentRestartGeneration = Optional.empty(); @@ -115,24 +103,19 @@ public class NodeAgentImpl implements NodeAgent { // Created in NodeAdminImpl public NodeAgentImpl( - final NodeAgentContext context, + final NodeAgentContextSupplier contextSupplier, final NodeRepository nodeRepository, final Orchestrator orchestrator, final DockerOperations dockerOperations, final StorageMaintainer storageMaintainer, - final Clock clock, - final Duration timeBetweenEachConverge, final Optional<AthenzCredentialsMaintainer> athenzCredentialsMaintainer, final Optional<AclMaintainer> aclMaintainer, final Optional<HealthChecker> healthChecker) { - this.context = context; + this.contextSupplier = contextSupplier; this.nodeRepository = nodeRepository; this.orchestrator = orchestrator; this.dockerOperations = dockerOperations; this.storageMaintainer = storageMaintainer; - this.clock = clock; - this.timeBetweenEachConverge = timeBetweenEachConverge; - this.lastConverge = clock.instant(); this.athenzCredentialsMaintainer = athenzCredentialsMaintainer; this.aclMaintainer = aclMaintainer; this.healthChecker = healthChecker; @@ -140,16 +123,15 @@ public class NodeAgentImpl implements NodeAgent { this.loopThread = new Thread(() -> { while (!terminated.get()) { try { - tick(); - } catch (Throwable t) { - numberOfUnhandledException++; - context.log(logger, LogLevel.ERROR, "Unhandled throwable, ignoring", t); - } + NodeAgentContext context = contextSupplier.nextContext(); + converge(context); + } catch (InterruptedException ignored) { } } }); - this.loopThread.setName("tick-" + context.hostname()); + this.loopThread.setName("tick-" + contextSupplier.currentContext().hostname()); this.serviceRestarter = service -> { + NodeAgentContext context = contextSupplier.currentContext(); try { ProcessResult processResult = dockerOperations.executeCommandInContainerAsRoot( context, "service", service, "restart"); @@ -164,46 +146,29 @@ public class NodeAgentImpl implements NodeAgent { } @Override - public boolean setFrozen(boolean frozen) { - synchronized (monitor) { - if (wantFrozen != frozen) { - wantFrozen = frozen; - context.log(logger, LogLevel.DEBUG, wantFrozen ? "Freezing" : "Unfreezing"); - signalWorkToBeDone(); - } - - return isFrozen == frozen; - } - } - - @Override public void start() { - context.log(logger, "Starting with interval " + timeBetweenEachConverge.toMillis() + " ms"); loopThread.start(); } @Override public void stop() { - filebeatRestarter.shutdown(); if (!terminated.compareAndSet(false, true)) { throw new RuntimeException("Can not re-stop a node agent."); } - signalWorkToBeDone(); + filebeatRestarter.shutdown(); + contextSupplier.interrupt(); do { try { loopThread.join(); filebeatRestarter.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - } catch (InterruptedException e) { - context.log(logger, LogLevel.ERROR, - "Interrupted while waiting for converge thread and filebeatRestarter scheduler to shutdown"); - } + } catch (InterruptedException ignored) { } } while (loopThread.isAlive() || !filebeatRestarter.isTerminated()); - context.log(logger, "Stopped"); + contextSupplier.currentContext().log(logger, "Stopped"); } - void startServicesIfNeeded() { + void startServicesIfNeeded(NodeAgentContext context) { if (!hasStartedServices) { context.log(logger, "Starting services"); dockerOperations.startServices(context); @@ -211,7 +176,7 @@ public class NodeAgentImpl implements NodeAgent { } } - void resumeNodeIfNeeded(NodeSpec node) { + void resumeNodeIfNeeded(NodeAgentContext context, NodeSpec node) { if (!hasResumedNode) { if (!currentFilebeatRestarter.isPresent()) { storageMaintainer.writeMetricsConfig(context, node); @@ -225,7 +190,7 @@ public class NodeAgentImpl implements NodeAgent { } } - private void updateNodeRepoWithCurrentAttributes(final NodeSpec node) { + private void updateNodeRepoWithCurrentAttributes(NodeAgentContext context, NodeSpec node) { final NodeAttributes currentNodeAttributes = new NodeAttributes(); final NodeAttributes newNodeAttributes = new NodeAttributes(); @@ -246,10 +211,10 @@ public class NodeAgentImpl implements NodeAgent { newNodeAttributes.withDockerImage(actualDockerImage.orElse(new DockerImage(""))); } - publishStateToNodeRepoIfChanged(currentNodeAttributes, newNodeAttributes); + publishStateToNodeRepoIfChanged(context, currentNodeAttributes, newNodeAttributes); } - private void publishStateToNodeRepoIfChanged(NodeAttributes currentAttributes, NodeAttributes newAttributes) { + private void publishStateToNodeRepoIfChanged(NodeAgentContext context, NodeAttributes currentAttributes, NodeAttributes newAttributes) { if (!currentAttributes.equals(newAttributes)) { context.log(logger, "Publishing new set of attributes to node repo: %s -> %s", currentAttributes, newAttributes); @@ -257,7 +222,7 @@ public class NodeAgentImpl implements NodeAgent { } } - private void startContainer(NodeSpec node) { + private void startContainer(NodeAgentContext context, NodeSpec node) { ContainerData containerData = createContainerData(context, node); dockerOperations.createContainer(context, node, containerData); dockerOperations.startContainer(context); @@ -268,13 +233,14 @@ public class NodeAgentImpl implements NodeAgent { context.log(logger, "Container successfully started, new containerState is " + containerState); } - private Optional<Container> removeContainerIfNeededUpdateContainerState(NodeSpec node, Optional<Container> existingContainer) { + private Optional<Container> removeContainerIfNeededUpdateContainerState( + NodeAgentContext context, NodeSpec node, Optional<Container> existingContainer) { return existingContainer - .flatMap(container -> removeContainerIfNeeded(node, container)) + .flatMap(container -> removeContainerIfNeeded(context, node, container)) .map(container -> { shouldRestartServices(node).ifPresent(restartReason -> { context.log(logger, "Will restart services: " + restartReason); - restartServices(node, container); + restartServices(context, node, container); currentRestartGeneration = node.getWantedRestartGeneration(); }); return container; @@ -292,17 +258,18 @@ public class NodeAgentImpl implements NodeAgent { return Optional.empty(); } - private void restartServices(NodeSpec node, Container existingContainer) { + private void restartServices(NodeAgentContext context, NodeSpec node, Container existingContainer) { if (existingContainer.state.isRunning() && node.getState() == Node.State.active) { context.log(logger, "Restarting services"); // Since we are restarting the services we need to suspend the node. - orchestratorSuspendNode(); + orchestratorSuspendNode(context); dockerOperations.restartVespa(context); } } @Override public void stopServices() { + NodeAgentContext context = contextSupplier.currentContext(); context.log(logger, "Stopping services"); if (containerState == ABSENT) return; try { @@ -315,6 +282,7 @@ public class NodeAgentImpl implements NodeAgent { @Override public void suspend() { + NodeAgentContext context = contextSupplier.currentContext(); context.log(logger, "Suspending services on node"); if (containerState == ABSENT) return; try { @@ -358,14 +326,14 @@ public class NodeAgentImpl implements NodeAgent { return Optional.empty(); } - private Optional<Container> removeContainerIfNeeded(NodeSpec node, Container existingContainer) { + private Optional<Container> removeContainerIfNeeded(NodeAgentContext context, NodeSpec node, Container existingContainer) { Optional<String> removeReason = shouldRemoveContainer(node, existingContainer); if (removeReason.isPresent()) { context.log(logger, "Will remove container: " + removeReason.get()); if (existingContainer.state.isRunning()) { if (node.getState() == Node.State.active) { - orchestratorSuspendNode(); + orchestratorSuspendNode(context); } try { @@ -399,78 +367,29 @@ public class NodeAgentImpl implements NodeAgent { } } - private void signalWorkToBeDone() { - synchronized (monitor) { - if (!workToDoNow) { - workToDoNow = true; - context.log(logger, LogLevel.DEBUG, "Signaling work to be done"); - monitor.notifyAll(); - } - } - } - - void tick() { - boolean isFrozenCopy; - synchronized (monitor) { - while (!workToDoNow) { - long remainder = timeBetweenEachConverge - .minus(Duration.between(lastConverge, clock.instant())) - .toMillis(); - if (remainder > 0) { - try { - monitor.wait(remainder); - } catch (InterruptedException e) { - context.log(logger, LogLevel.ERROR, "Interrupted while sleeping before tick, ignoring"); - } - } else break; - } - lastConverge = clock.instant(); - workToDoNow = false; - - if (isFrozen != wantFrozen) { - isFrozen = wantFrozen; - context.log(logger, "Updated NodeAgent's frozen state, new value: isFrozen: " + isFrozen); - } - isFrozenCopy = isFrozen; - } - - if (isFrozenCopy) { - context.log(logger, LogLevel.DEBUG, "tick: isFrozen"); - } else { - try { - converge(); - } catch (OrchestratorException | ConvergenceException e) { - context.log(logger, e.getMessage()); - } catch (ContainerNotFoundException e) { - containerState = ABSENT; - context.log(logger, LogLevel.WARNING, "Container unexpectedly gone, resetting containerState to " + containerState); - } catch (DockerException e) { - numberOfUnhandledException++; - context.log(logger, LogLevel.ERROR, "Caught a DockerException", e); - } catch (Exception e) { - numberOfUnhandledException++; - context.log(logger, LogLevel.ERROR, "Unhandled exception, ignoring.", e); - } + public void converge(NodeAgentContext context) { + try { + doConverge(context); + } catch (OrchestratorException | ConvergenceException e) { + context.log(logger, e.getMessage()); + } catch (ContainerNotFoundException e) { + containerState = ABSENT; + context.log(logger, LogLevel.WARNING, "Container unexpectedly gone, resetting containerState to " + containerState); + } catch (DockerException e) { + numberOfUnhandledException++; + context.log(logger, LogLevel.ERROR, "Caught a DockerException", e); + } catch (Throwable e) { + numberOfUnhandledException++; + context.log(logger, LogLevel.ERROR, "Unhandled exception, ignoring", e); } } // Public for testing - void converge() { - final Optional<NodeSpec> optionalNode = nodeRepository.getOptionalNode(context.hostname().value()); - - // We just removed the node from node repo, so this is expected until NodeAdmin stop this NodeAgent - if (!optionalNode.isPresent() && expectNodeNotInNodeRepo) { - context.log(logger, LogLevel.INFO, "Node removed from node repo (as expected)"); - return; - } - - final NodeSpec node = optionalNode.orElseThrow(() -> - new IllegalStateException(String.format("Node '%s' missing from node repository", context.hostname()))); - expectNodeNotInNodeRepo = false; - - Optional<Container> container = getContainer(); + void doConverge(NodeAgentContext context) { + NodeSpec node = context.node(); + Optional<Container> container = getContainer(context); if (!node.equals(lastNode)) { - logChangesToNodeSpec(lastNode, node); + logChangesToNodeSpec(context, lastNode, node); // Current reboot generation uninitialized or incremented from outside to cancel reboot if (currentRebootGeneration < node.getCurrentRebootGeneration()) @@ -496,8 +415,8 @@ public class NodeAgentImpl implements NodeAgent { case reserved: case parked: case failed: - removeContainerIfNeededUpdateContainerState(node, container); - updateNodeRepoWithCurrentAttributes(node); + removeContainerIfNeededUpdateContainerState(context, node, container); + updateNodeRepoWithCurrentAttributes(context, node); break; case active: storageMaintainer.handleCoreDumpsForContainer(context, node, container); @@ -512,17 +431,17 @@ public class NodeAgentImpl implements NodeAgent { context.log(logger, LogLevel.DEBUG, "Waiting for image to download " + imageBeingDownloaded.asString()); return; } - container = removeContainerIfNeededUpdateContainerState(node, container); + container = removeContainerIfNeededUpdateContainerState(context, node, container); athenzCredentialsMaintainer.ifPresent(maintainer -> maintainer.converge(context)); if (! container.isPresent()) { containerState = STARTING; - startContainer(node); + startContainer(context, node); containerState = UNKNOWN; aclMaintainer.ifPresent(AclMaintainer::converge); } - startServicesIfNeeded(); - resumeNodeIfNeeded(node); + startServicesIfNeeded(context); + resumeNodeIfNeeded(context, node); healthChecker.ifPresent(checker -> checker.verifyHealth(context)); // Because it's more important to stop a bad release from rolling out in prod, @@ -535,32 +454,31 @@ public class NodeAgentImpl implements NodeAgent { // has been successfully rolled out. // - Slobrok and internal orchestrator state is used to determine whether // to allow upgrade (suspend). - updateNodeRepoWithCurrentAttributes(node); + updateNodeRepoWithCurrentAttributes(context, node); context.log(logger, "Call resume against Orchestrator"); orchestrator.resume(context.hostname().value()); break; case inactive: - removeContainerIfNeededUpdateContainerState(node, container); - updateNodeRepoWithCurrentAttributes(node); + removeContainerIfNeededUpdateContainerState(context, node, container); + updateNodeRepoWithCurrentAttributes(context, node); break; case provisioned: nodeRepository.setNodeState(context.hostname().value(), Node.State.dirty); break; case dirty: - removeContainerIfNeededUpdateContainerState(node, container); + removeContainerIfNeededUpdateContainerState(context, node, container); context.log(logger, "State is " + node.getState() + ", will delete application storage and mark node as ready"); athenzCredentialsMaintainer.ifPresent(maintainer -> maintainer.clearCredentials(context)); storageMaintainer.archiveNodeStorage(context); - updateNodeRepoWithCurrentAttributes(node); + updateNodeRepoWithCurrentAttributes(context, node); nodeRepository.setNodeState(context.hostname().value(), Node.State.ready); - expectNodeNotInNodeRepo = true; break; default: throw new RuntimeException("UNKNOWN STATE " + node.getState().name()); } } - private void logChangesToNodeSpec(NodeSpec lastNode, NodeSpec node) { + private void logChangesToNodeSpec(NodeAgentContext context, NodeSpec lastNode, NodeSpec node) { StringBuilder builder = new StringBuilder(); appendIfDifferent(builder, "state", lastNode, node, NodeSpec::getState); if (builder.length() > 0) { @@ -592,8 +510,9 @@ public class NodeAgentImpl implements NodeAgent { @SuppressWarnings("unchecked") public void updateContainerNodeMetrics() { - final NodeSpec node = lastNode; - if (node == null || containerState != UNKNOWN) return; + if (containerState != UNKNOWN) return; + final NodeAgentContext context = contextSupplier.currentContext(); + final NodeSpec node = context.node(); Optional<ContainerStats> containerStats = dockerOperations.getContainerStats(context); if (!containerStats.isPresent()) return; @@ -660,10 +579,10 @@ public class NodeAgentImpl implements NodeAgent { metrics.add(networkMetrics); }); - pushMetricsToContainer(metrics); + pushMetricsToContainer(context, metrics); } - private void pushMetricsToContainer(List<DimensionMetrics> metrics) { + private void pushMetricsToContainer(NodeAgentContext context, List<DimensionMetrics> metrics) { StringBuilder params = new StringBuilder(); try { for (DimensionMetrics dimensionMetrics : metrics) { @@ -679,7 +598,7 @@ public class NodeAgentImpl implements NodeAgent { } } - private Optional<Container> getContainer() { + private Optional<Container> getContainer(NodeAgentContext context) { if (containerState == ABSENT) return Optional.empty(); Optional<Container> container = dockerOperations.getContainer(context); if (! container.isPresent()) containerState = ABSENT; @@ -743,7 +662,7 @@ public class NodeAgentImpl implements NodeAgent { // More generally, the node repo response should contain sufficient info on what the docker image is, // to allow the node admin to make decisions that depend on the docker image. Or, each docker image // needs to contain routines for drain and suspend. For many images, these can just be dummy routines. - private void orchestratorSuspendNode() { + private void orchestratorSuspendNode(NodeAgentContext context) { context.log(logger, "Ask Orchestrator for permission to suspend node"); orchestrator.suspend(context.hostname().value()); } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java index b6128fc8693..aa1b72aa210 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java @@ -4,8 +4,8 @@ package com.yahoo.vespa.hosted.node.admin.nodeagent; import com.fasterxml.jackson.databind.ObjectMapper; import com.yahoo.config.provision.NodeType; import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.test.ManualClock; import com.yahoo.vespa.hosted.dockerapi.Container; +import com.yahoo.vespa.hosted.dockerapi.ContainerName; import com.yahoo.vespa.hosted.dockerapi.ContainerResources; import com.yahoo.vespa.hosted.dockerapi.ContainerStats; import com.yahoo.vespa.hosted.dockerapi.exception.DockerException; @@ -13,6 +13,7 @@ import com.yahoo.vespa.hosted.dockerapi.DockerImage; import com.yahoo.vespa.hosted.dockerapi.metrics.MetricReceiverWrapper; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec; import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeAttributes; +import com.yahoo.vespa.hosted.node.admin.configserver.orchestrator.OrchestratorException; import com.yahoo.vespa.hosted.node.admin.docker.DockerOperations; import com.yahoo.vespa.hosted.node.admin.maintenance.StorageMaintainer; import com.yahoo.vespa.hosted.node.admin.maintenance.acl.AclMaintainer; @@ -28,7 +29,6 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -36,8 +36,6 @@ import java.util.Set; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -56,14 +54,21 @@ import static org.mockito.Mockito.when; * @author Øyvind Bakksjø */ public class NodeAgentImplTest { - private static final Duration NODE_AGENT_SCAN_INTERVAL = Duration.ofSeconds(30); private static final double MIN_CPU_CORES = 2; private static final double MIN_MAIN_MEMORY_AVAILABLE_GB = 16; private static final double MIN_DISK_AVAILABLE_GB = 250; private static final String vespaVersion = "1.2.3"; private final String hostName = "host1.test.yahoo.com"; - private final NodeAgentContext context = new NodeAgentContextImpl.Builder(hostName).build(); + private final NodeSpec.Builder nodeBuilder = new NodeSpec.Builder() + .hostname(hostName) + .nodeType(NodeType.tenant) + .flavor("docker") + .minCpuCores(MIN_CPU_CORES) + .minMainMemoryAvailableGb(MIN_MAIN_MEMORY_AVAILABLE_GB) + .minDiskAvailableGb(MIN_DISK_AVAILABLE_GB); + + private final NodeAgentContextSupplier contextSupplier = mock(NodeAgentContextSupplier.class); private final DockerImage dockerImage = new DockerImage("dockerImage"); private final DockerOperations dockerOperations = mock(DockerOperations.class); private final NodeRepository nodeRepository = mock(NodeRepository.class); @@ -76,16 +81,6 @@ public class NodeAgentImplTest { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); private final AthenzCredentialsMaintainer athenzCredentialsMaintainer = mock(AthenzCredentialsMaintainer.class); - private final ManualClock clock = new ManualClock(); - - private final NodeSpec.Builder nodeBuilder = new NodeSpec.Builder() - .hostname(context.hostname().value()) - .nodeType(NodeType.tenant) - .flavor("docker") - .minCpuCores(MIN_CPU_CORES) - .minMainMemoryAvailableGb(MIN_MAIN_MEMORY_AVAILABLE_GB) - .minDiskAvailableGb(MIN_DISK_AVAILABLE_GB); - @Test public void upToDateContainerIsUntouched() { @@ -97,11 +92,12 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(187500000000L)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(dockerOperations, never()).removeContainer(eq(context), any()); verify(orchestrator, never()).suspend(any(String.class)); @@ -125,11 +121,12 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(217432719360L)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(storageMaintainer, times(1)).removeOldFilesFromNode(eq(context)); } @@ -145,27 +142,28 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(187500000000L)); - nodeAgent.converge(); + nodeAgent.doConverge(context); inOrder.verify(dockerOperations, never()).startServices(eq(context)); inOrder.verify(dockerOperations, times(1)).resumeNode(eq(context)); nodeAgent.suspend(); - nodeAgent.converge(); + nodeAgent.doConverge(context); inOrder.verify(dockerOperations, never()).startServices(eq(context)); inOrder.verify(dockerOperations, times(1)).resumeNode(eq(context)); // Expect a resume, but no start services // No new suspends/stops, so no need to resume/start - nodeAgent.converge(); + nodeAgent.doConverge(context); inOrder.verify(dockerOperations, never()).startServices(eq(context)); inOrder.verify(dockerOperations, never()).resumeNode(eq(context)); nodeAgent.suspend(); nodeAgent.stopServices(); - nodeAgent.converge(); + nodeAgent.doConverge(context); inOrder.verify(dockerOperations, times(1)).startServices(eq(context)); inOrder.verify(dockerOperations, times(1)).resumeNode(eq(context)); } @@ -181,13 +179,14 @@ public class NodeAgentImplTest { .currentRestartGeneration(restartGeneration.get()) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(null, false); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); when(dockerOperations.pullImageAsyncIfNeeded(eq(dockerImage))).thenReturn(false); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(201326592000L)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(dockerOperations, never()).removeContainer(eq(context), any()); verify(dockerOperations, never()).startServices(any()); @@ -216,13 +215,14 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); when(dockerOperations.pullImageAsyncIfNeeded(any())).thenReturn(true); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(201326592000L)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(orchestrator, never()).suspend(any(String.class)); verify(orchestrator, never()).resume(any(String.class)); @@ -241,29 +241,25 @@ public class NodeAgentImplTest { .wantedVespaVersion(vespaVersion) .vespaVersion(vespaVersion); + NodeAgentContext firstContext = createContext(specBuilder.build()); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); - NodeSpec firstSpec = specBuilder.build(); - NodeSpec secondSpec = specBuilder.minDiskAvailableGb(200).build(); - NodeSpec thirdSpec = specBuilder.minCpuCores(4).build(); - - when(nodeRepository.getOptionalNode(hostName)) - .thenReturn(Optional.of(firstSpec)) - .thenReturn(Optional.of(secondSpec)) - .thenReturn(Optional.of(thirdSpec)); + when(dockerOperations.pullImageAsyncIfNeeded(any())).thenReturn(true); - when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(201326592000L)); + when(storageMaintainer.getDiskUsageFor(any())).thenReturn(Optional.of(201326592000L)); - nodeAgent.converge(); - nodeAgent.converge(); - nodeAgent.converge(); + nodeAgent.doConverge(firstContext); + NodeAgentContext secondContext = createContext(specBuilder.minDiskAvailableGb(200).build()); + nodeAgent.doConverge(secondContext); + NodeAgentContext thirdContext = createContext(specBuilder.minCpuCores(4).build()); + nodeAgent.doConverge(thirdContext); InOrder inOrder = inOrder(orchestrator, dockerOperations); inOrder.verify(orchestrator).resume(any(String.class)); inOrder.verify(orchestrator).resume(any(String.class)); inOrder.verify(orchestrator).suspend(any(String.class)); - inOrder.verify(dockerOperations).removeContainer(eq(context), any()); - inOrder.verify(dockerOperations, times(1)).createContainer(eq(context), eq(thirdSpec), any()); - inOrder.verify(dockerOperations).startContainer(eq(context)); + inOrder.verify(dockerOperations).removeContainer(eq(thirdContext), any()); + inOrder.verify(dockerOperations, times(1)).createContainer(eq(thirdContext), eq(thirdContext.node()), any()); + inOrder.verify(dockerOperations).startContainer(eq(thirdContext)); inOrder.verify(orchestrator).resume(any(String.class)); } @@ -281,12 +277,14 @@ public class NodeAgentImplTest { .currentRestartGeneration(currentRestartGeneration) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); + doThrow(new OrchestratorException("Denied")).when(orchestrator).suspend(eq(hostName)); try { - nodeAgent.converge(); + nodeAgent.doConverge(context); fail("Expected to throw an exception"); - } catch (Exception ignored) { } + } catch (OrchestratorException ignored) { } verify(dockerOperations, never()).createContainer(eq(context), eq(node), any()); verify(dockerOperations, never()).startContainer(eq(context)); @@ -308,6 +306,7 @@ public class NodeAgentImplTest { .currentRebootGeneration(currentRebootGeneration) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); @@ -317,7 +316,7 @@ public class NodeAgentImplTest { .when(healthChecker).verifyHealth(eq(context)); try { - nodeAgent.converge(); + nodeAgent.doConverge(context); } catch (ConvergenceException ignored) {} // First time we fail to resume because health verification fails @@ -328,7 +327,7 @@ public class NodeAgentImplTest { verify(orchestrator, never()).resume(eq(hostName)); verify(nodeRepository, never()).updateNodeAttributes(any(), any()); - nodeAgent.converge(); + nodeAgent.doConverge(context); // Do not reboot the container again verify(dockerOperations, times(1)).removeContainer(eq(context), any()); @@ -348,11 +347,12 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(dockerOperations, never()).removeContainer(eq(context), any()); verify(orchestrator, never()).resume(any(String.class)); @@ -365,13 +365,14 @@ public class NodeAgentImplTest { .state(Node.State.ready) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(null,false); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); - nodeAgent.converge(); - nodeAgent.converge(); - nodeAgent.converge(); + nodeAgent.doConverge(context); + nodeAgent.doConverge(context); + nodeAgent.doConverge(context); // Should only be called once, when we initialize verify(dockerOperations, times(1)).getContainer(eq(context)); @@ -392,11 +393,12 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); - nodeAgent.converge(); + nodeAgent.doConverge(context); final InOrder inOrder = inOrder(storageMaintainer, dockerOperations); inOrder.verify(dockerOperations, never()).removeContainer(eq(context), any()); @@ -413,11 +415,12 @@ public class NodeAgentImplTest { .wantedVespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(null, false); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(nodeRepository, never()).updateNodeAttributes(eq(hostName), any()); } @@ -433,11 +436,12 @@ public class NodeAgentImplTest { .state(nodeState) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); - nodeAgent.converge(); + nodeAgent.doConverge(context); final InOrder inOrder = inOrder(storageMaintainer, dockerOperations, nodeRepository); inOrder.verify(dockerOperations, times(1)).stopServices(eq(context)); @@ -474,10 +478,11 @@ public class NodeAgentImplTest { .state(Node.State.provisioned) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(null, false); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(nodeRepository, times(1)).setNodeState(eq(hostName), eq(Node.State.dirty)); } @@ -490,12 +495,13 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, false); when(nodeRepository.getOptionalNode(eq(hostName))).thenReturn(Optional.of(node)); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(201326592000L)); - nodeAgent.tick(); + nodeAgent.doConverge(context); verify(dockerOperations, times(1)).removeContainer(eq(context), any()); verify(dockerOperations, times(1)).createContainer(eq(context), eq(node), any()); @@ -511,6 +517,7 @@ public class NodeAgentImplTest { .vespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(eq(hostName))).thenReturn(Optional.of(node)); @@ -523,7 +530,7 @@ public class NodeAgentImplTest { // 1st try try { - nodeAgent.converge(); + nodeAgent.doConverge(context); fail("Expected to throw an exception"); } catch (RuntimeException ignored) { } @@ -531,7 +538,7 @@ public class NodeAgentImplTest { inOrder.verifyNoMoreInteractions(); // 2nd try - nodeAgent.converge(); + nodeAgent.doConverge(context); inOrder.verify(dockerOperations).resumeNode(any()); inOrder.verify(orchestrator).resume(hostName); @@ -539,33 +546,6 @@ public class NodeAgentImplTest { } @Test - public void testSetFrozen() { - NodeAgentImpl nodeAgent = spy(makeNodeAgent(dockerImage, true)); - doNothing().when(nodeAgent).converge(); - - nodeAgent.tick(); - verify(nodeAgent, times(1)).converge(); - - assertFalse(nodeAgent.setFrozen(true)); // Returns true because we are not frozen until tick is called - nodeAgent.tick(); - verify(nodeAgent, times(1)).converge(); // Frozen should be set, therefore converge is never called - - assertTrue(nodeAgent.setFrozen(true)); // Attempt to set frozen again, but it's already set - clock.advance(Duration.ofSeconds(35)); // workToDoNow is no longer set, so we need to wait the regular time - nodeAgent.tick(); - verify(nodeAgent, times(1)).converge(); - - assertFalse(nodeAgent.setFrozen(false)); // Unfreeze, but still need to call tick for it to take effect - nodeAgent.tick(); - verify(nodeAgent, times(2)).converge(); - - assertTrue(nodeAgent.setFrozen(false)); - clock.advance(Duration.ofSeconds(35)); // workToDoNow is no longer set, so we need to wait the regular time - nodeAgent.tick(); - verify(nodeAgent, times(3)).converge(); - } - - @Test public void start_container_subtask_failure_leads_to_container_restart() { final NodeSpec node = nodeBuilder .wantedDockerImage(dockerImage) @@ -573,32 +553,32 @@ public class NodeAgentImplTest { .wantedVespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = spy(makeNodeAgent(null, false)); - when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); when(dockerOperations.pullImageAsyncIfNeeded(eq(dockerImage))).thenReturn(false); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(201326592000L)); doThrow(new DockerException("Failed to set up network")).doNothing().when(dockerOperations).startContainer(eq(context)); try { - nodeAgent.converge(); + nodeAgent.doConverge(context); fail("Expected to get DockerException"); } catch (DockerException ignored) { } verify(dockerOperations, never()).removeContainer(eq(context), any()); verify(dockerOperations, times(1)).createContainer(eq(context), eq(node), any()); verify(dockerOperations, times(1)).startContainer(eq(context)); - verify(nodeAgent, never()).resumeNodeIfNeeded(any()); + verify(nodeAgent, never()).resumeNodeIfNeeded(any(), any()); // The docker container was actually started and is running, but subsequent exec calls to set up // networking failed mockGetContainer(dockerImage, true); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(dockerOperations, times(1)).removeContainer(eq(context), any()); verify(dockerOperations, times(2)).createContainer(eq(context), eq(node), any()); verify(dockerOperations, times(2)).startContainer(eq(context)); - verify(nodeAgent, times(1)).resumeNodeIfNeeded(any()); + verify(nodeAgent, times(1)).resumeNodeIfNeeded(any(), any()); } @Test @@ -631,6 +611,7 @@ public class NodeAgentImplTest { .parentHostname("parent.host.name.yahoo.com") .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(dockerImage, true); when(nodeRepository.getOptionalNode(eq(hostName))).thenReturn(Optional.of(node)); @@ -638,12 +619,9 @@ public class NodeAgentImplTest { when(dockerOperations.getContainerStats(eq(context))) .thenReturn(Optional.of(stats1)) .thenReturn(Optional.of(stats2)); - - nodeAgent.converge(); // Run the converge loop once to initialize lastNode + nodeAgent.updateContainerNodeMetrics(); // Update metrics once to init and lastCpuMetric - clock.advance(Duration.ofSeconds(1234)); - Path pathToExpectedMetrics = Paths.get(classLoader.getResource("expected.container.system.metrics.txt").getPath()); String expectedMetrics = new String(Files.readAllBytes(pathToExpectedMetrics)) .replaceAll("\\s", "") @@ -674,13 +652,11 @@ public class NodeAgentImplTest { .state(Node.State.ready) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(null, false); - when(nodeRepository.getOptionalNode(eq(hostName))).thenReturn(Optional.of(node)); when(dockerOperations.getContainerStats(eq(context))).thenReturn(Optional.empty()); - nodeAgent.converge(); // Run the converge loop once to initialize lastNode - nodeAgent.updateContainerNodeMetrics(); Set<Map<String, Object>> actualMetrics = metricReceiver.getAllMetricsRaw(); @@ -696,13 +672,14 @@ public class NodeAgentImplTest { .wantedVespaVersion(vespaVersion) .build(); + NodeAgentContext context = createContext(node); NodeAgentImpl nodeAgent = makeNodeAgent(null, false); when(nodeRepository.getOptionalNode(hostName)).thenReturn(Optional.of(node)); when(dockerOperations.pullImageAsyncIfNeeded(eq(dockerImage))).thenReturn(false); when(storageMaintainer.getDiskUsageFor(eq(context))).thenReturn(Optional.of(201326592000L)); - nodeAgent.converge(); + nodeAgent.doConverge(context); verify(dockerOperations, never()).removeContainer(eq(context), any()); verify(orchestrator, never()).suspend(any(String.class)); @@ -724,22 +701,31 @@ public class NodeAgentImplTest { when(dockerOperations.getContainerStats(any())).thenReturn(Optional.of(emptyContainerStats)); doNothing().when(storageMaintainer).writeMetricsConfig(any(), any()); - return new NodeAgentImpl(context, nodeRepository, orchestrator, dockerOperations, - storageMaintainer, clock, NODE_AGENT_SCAN_INTERVAL, Optional.of(athenzCredentialsMaintainer), Optional.of(aclMaintainer), + return new NodeAgentImpl(contextSupplier, nodeRepository, orchestrator, dockerOperations, + storageMaintainer, Optional.of(athenzCredentialsMaintainer), Optional.of(aclMaintainer), Optional.of(healthChecker)); } private void mockGetContainer(DockerImage dockerImage, boolean isRunning) { - Optional<Container> container = dockerImage != null ? - Optional.of(new Container( - hostName, - dockerImage, - ContainerResources.from(MIN_CPU_CORES, MIN_MAIN_MEMORY_AVAILABLE_GB), - context.containerName(), - isRunning ? Container.State.RUNNING : Container.State.EXITED, - isRunning ? 1 : 0)) : - Optional.empty(); - - when(dockerOperations.getContainer(eq(context))).thenReturn(container); + doAnswer(invoc -> { + NodeAgentContext context = invoc.getArgument(0); + if (!hostName.equals(context.hostname().value())) + throw new IllegalArgumentException(); + return dockerImage != null ? + Optional.of(new Container( + hostName, + dockerImage, + ContainerResources.from(MIN_CPU_CORES, MIN_MAIN_MEMORY_AVAILABLE_GB), + ContainerName.fromHostname(hostName), + isRunning ? Container.State.RUNNING : Container.State.EXITED, + isRunning ? 1 : 0)) : + Optional.empty(); + }).when(dockerOperations).getContainer(any()); + } + + private NodeAgentContext createContext(NodeSpec nodeSpec) { + NodeAgentContext context = new NodeAgentContextImpl.Builder(nodeSpec).build(); + when(contextSupplier.currentContext()).thenReturn(context); + return context; } } |