diff options
3 files changed, 140 insertions, 92 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java index a5b31d95705..c030cfbe058 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgent.java @@ -20,9 +20,9 @@ public interface NodeAgent { void unfreeze(); /** - * Force the NodeAgent to check node repository. Intended for testing. + * Make NodeAgent check for work to be done. */ - void tick(); + void signalWorkToBeDone(); /** diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java index 3aee6be90af..9ec9d455cc4 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java @@ -9,8 +9,6 @@ import com.yahoo.vespa.hosted.node.admin.noderepository.NodeRepository; import com.yahoo.vespa.hosted.node.admin.orchestrator.Orchestrator; import java.io.IOException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; @@ -19,7 +17,6 @@ import java.util.logging.Logger; import static com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentImpl.ContainerState.ABSENT; import static com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentImpl.ContainerState.RUNNING; import static com.yahoo.vespa.hosted.node.admin.nodeagent.NodeAgentImpl.ContainerState.RUNNING_HOWEVER_RESUME_SCRIPT_NOT_RUN; -import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * @author dybis @@ -27,9 +24,10 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; */ public class NodeAgentImpl implements NodeAgent { - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private AtomicBoolean isFrozen = new AtomicBoolean(false); private AtomicBoolean wantFrozen = new AtomicBoolean(false); + private AtomicBoolean terminated = new AtomicBoolean(false); + private AtomicBoolean workToDoNow = new AtomicBoolean(true); private static final Logger logger = Logger.getLogger(NodeAgentImpl.class.getName()); @@ -47,6 +45,9 @@ public class NodeAgentImpl implements NodeAgent { private AtomicReference<String> debugString = new AtomicReference<>("not started"); + private long delaysBetweenEachTickMillis; + + private Thread loopThread; public enum ContainerState { ABSENT, @@ -73,11 +74,13 @@ public class NodeAgentImpl implements NodeAgent { @Override public void freeze() { wantFrozen.set(true); + signalWorkToBeDone(); } @Override public void unfreeze() { wantFrozen.set(false); + signalWorkToBeDone(); } @Override @@ -92,18 +95,29 @@ public class NodeAgentImpl implements NodeAgent { @Override public void start(int intervalMillis) { - if (scheduler.isTerminated()) { + delaysBetweenEachTickMillis = intervalMillis; + if (loopThread != null) { throw new RuntimeException("Can not restart a node agent."); } - scheduler.scheduleWithFixedDelay(this::tick, intervalMillis, intervalMillis, MILLISECONDS); + loopThread = new Thread(this::loop); + loopThread.setName("loop-" + hostname.toString()); + loopThread.start(); } @Override public void stop() { - if (scheduler.isTerminated()) { + if (!terminated.compareAndSet(false, true)) { throw new RuntimeException("Can not re-stop a node agent."); } - scheduler.shutdown(); + signalWorkToBeDone(); + try { + loopThread.join(10000); + if (loopThread.isAlive()) { + logger.severe("Could not stop host thread " + hostname); + } + } catch (InterruptedException e1) { + logger.severe("Interrupted; Could not stop host thread " + hostname); + } } private void runLocalResumeScriptIfNeeded(final ContainerNodeSpec nodeSpec) { @@ -147,10 +161,6 @@ public class NodeAgentImpl implements NodeAgent { } } - private void nodeTickInNewThread() { - new Thread(this::tick).start(); - } - private void removeContainerIfNeededUpdateContainerState(ContainerNodeSpec nodeSpec) throws Exception { if (dockerOperations.removeContainerIfNeeded(nodeSpec, hostname, orchestrator)) { containerState = ABSENT; @@ -164,89 +174,110 @@ public class NodeAgentImpl implements NodeAgent { return; } imageBeingDownloaded = nodeSpec.wantedDockerImage.get(); - // Create a tick when download is finished. - dockerOperations.scheduleDownloadOfImage(nodeSpec, this::nodeTickInNewThread); + // Create a signalWorkToBeDone when download is finished. + dockerOperations.scheduleDownloadOfImage(nodeSpec, this::signalWorkToBeDone); } else { imageBeingDownloaded = null; } } @Override - public void tick() { - StringBuilder debugStringBuilder = new StringBuilder(hostname.toString()); + public void signalWorkToBeDone() { + workToDoNow.set(true); synchronized (monitor) { + monitor.notifyAll(); + } + } + + private void loop() { + while (! terminated.get()) { + if (! workToDoNow.get()) { + try { + synchronized (monitor) { + monitor.wait(delaysBetweenEachTickMillis); + } + } catch (InterruptedException e) { + logger.severe("Interrupted, but ignoring this: " + hostname); + continue; + } + } + workToDoNow.set(false); isFrozen.set(wantFrozen.get()); if (isFrozen.get()) { - debugStringBuilder.append(" frozen"); - debugString.set(debugStringBuilder.toString()); - return; - } - try { - final ContainerNodeSpec nodeSpec = nodeRepository.getContainerNodeSpec(hostname) - .orElseThrow(() -> - new IllegalStateException(String.format("Node '%s' missing from node repository.", hostname))); - debugStringBuilder.append("Loaded node spec: ").append(nodeSpec.toString()); - switch (nodeSpec.nodeState) { - case PROVISIONED: - removeContainerIfNeededUpdateContainerState(nodeSpec); - logger.log(LogLevel.INFO, logPrefix + "State is provisioned, will delete application storage and mark node as ready"); - dockerOperations.deleteContainerStorage(nodeSpec.containerName); - nodeRepository.markAsReady(nodeSpec.hostname); - break; - case READY: - removeContainerIfNeededUpdateContainerState(nodeSpec); - break; - case RESERVED: - removeContainerIfNeededUpdateContainerState(nodeSpec); - break; - case ACTIVE: - scheduleDownLoadIfNeeded(nodeSpec); - if (imageBeingDownloaded != null) { - debugStringBuilder.append("Waiting for image to download " + imageBeingDownloaded.asString()); - debugString.set(debugStringBuilder.toString()); - return; - } - removeContainerIfNeededUpdateContainerState(nodeSpec); - - startContainerIfNeeded(nodeSpec); - runLocalResumeScriptIfNeeded(nodeSpec); - // Because it's more important to stop a bad release from rolling out in prod, - // we put the resume call last. So if we fail after updating the node repo attributes - // but before resume, the app may go through the tenant pipeline but will halt in prod. - // - // Note that this problem exists only because there are 2 different mechanisms - // that should really be parts of a single mechanism: - // - The content of node repo is used to determine whether a new Vespa+application - // has been successfully rolled out. - // - Slobrok and internal orchestrator state is used to determine whether - // to allow upgrade (suspend). - publishStateToNodeRepoIfChanged(nodeSpec); - orchestrator.resume(nodeSpec.hostname); - break; - case INACTIVE: - removeContainerIfNeededUpdateContainerState(nodeSpec); - break; - case DIRTY: - removeContainerIfNeededUpdateContainerState(nodeSpec); - logger.log(LogLevel.INFO, logPrefix + "State is dirty, will delete application storage and mark node as ready"); - dockerOperations.deleteContainerStorage(nodeSpec.containerName); - nodeRepository.markAsReady(nodeSpec.hostname); - break; - case FAILED: - removeContainerIfNeededUpdateContainerState(nodeSpec); - break; - default: - throw new RuntimeException("UNKNOWN STATE " + nodeSpec.nodeState.name()); + debugString.set(hostname + " frozen"); + } else { + try { + tick(); + } catch (Exception e) { + logger.log(LogLevel.ERROR, logPrefix + "Unhandled exception, ignoring.", e); + debugString.set(hostname + " " + e.getMessage()); + } catch (Throwable t) { + logger.log(LogLevel.ERROR, logPrefix + "Unhandled throwable, taking down system.", t); + System.exit(234); } - } catch (Exception e) { - logger.log(LogLevel.ERROR, logPrefix + "Unhandled exception, ignoring.", e); - debugStringBuilder.append(e.getMessage()); - } catch (Throwable t) { - logger.log(LogLevel.ERROR, logPrefix + "Unhandled throwable, taking down system.", t); - System.exit(234); } - debugString.set(debugStringBuilder.toString()); + } + } + // For testing + public void tick() throws Exception { + StringBuilder debugStringBuilder = new StringBuilder(hostname.toString()); + final ContainerNodeSpec nodeSpec = nodeRepository.getContainerNodeSpec(hostname) + .orElseThrow(() -> + new IllegalStateException(String.format("Node '%s' missing from node repository.", hostname))); + debugStringBuilder.append("Loaded node spec: ").append(nodeSpec.toString()); + switch (nodeSpec.nodeState) { + case PROVISIONED: + removeContainerIfNeededUpdateContainerState(nodeSpec); + logger.log(LogLevel.INFO, logPrefix + "State is provisioned, will delete application storage and mark node as ready"); + dockerOperations.deleteContainerStorage(nodeSpec.containerName); + nodeRepository.markAsReady(nodeSpec.hostname); + break; + case READY: + removeContainerIfNeededUpdateContainerState(nodeSpec); + break; + case RESERVED: + removeContainerIfNeededUpdateContainerState(nodeSpec); + break; + case ACTIVE: + scheduleDownLoadIfNeeded(nodeSpec); + if (imageBeingDownloaded != null) { + debugStringBuilder.append("Waiting for image to download " + imageBeingDownloaded.asString()); + debugString.set(debugStringBuilder.toString()); + return; + } + removeContainerIfNeededUpdateContainerState(nodeSpec); + + startContainerIfNeeded(nodeSpec); + runLocalResumeScriptIfNeeded(nodeSpec); + // Because it's more important to stop a bad release from rolling out in prod, + // we put the resume call last. So if we fail after updating the node repo attributes + // but before resume, the app may go through the tenant pipeline but will halt in prod. + // + // Note that this problem exists only because there are 2 different mechanisms + // that should really be parts of a single mechanism: + // - The content of node repo is used to determine whether a new Vespa+application + // has been successfully rolled out. + // - Slobrok and internal orchestrator state is used to determine whether + // to allow upgrade (suspend). + publishStateToNodeRepoIfChanged(nodeSpec); + orchestrator.resume(nodeSpec.hostname); + break; + case INACTIVE: + removeContainerIfNeededUpdateContainerState(nodeSpec); + break; + case DIRTY: + removeContainerIfNeededUpdateContainerState(nodeSpec); + logger.log(LogLevel.INFO, logPrefix + "State is dirty, will delete application storage and mark node as ready"); + dockerOperations.deleteContainerStorage(nodeSpec.containerName); + nodeRepository.markAsReady(nodeSpec.hostname); + break; + case FAILED: + removeContainerIfNeededUpdateContainerState(nodeSpec); + break; + default: + throw new RuntimeException("UNKNOWN STATE " + nodeSpec.nodeState.name()); } + debugString.set(debugStringBuilder.toString()); } } diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java index 1f33dbea764..f7ce111fe05 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImplTest.java @@ -11,6 +11,8 @@ import com.yahoo.vespa.hosted.node.admin.docker.ProcessResult; import com.yahoo.vespa.hosted.node.admin.noderepository.NodeRepository; import com.yahoo.vespa.hosted.node.admin.noderepository.NodeState; import com.yahoo.vespa.hosted.node.admin.orchestrator.Orchestrator; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -52,7 +54,7 @@ public class NodeAgentImplTest { private final NodeRepository nodeRepository = mock(NodeRepository.class); private final Orchestrator orchestrator = mock(Orchestrator.class); - private final NodeAgent nodeAgent = new NodeAgentImpl(hostName, nodeRepository, orchestrator, new DockerOperations(docker)); + private final NodeAgentImpl nodeAgent = new NodeAgentImpl(hostName, nodeRepository, orchestrator, new DockerOperations(docker)); @Test public void upToDateContainerIsUntouched() throws Exception { @@ -336,7 +338,11 @@ public class NodeAgentImplTest { when(docker.getContainer(hostName)).thenReturn(Optional.of(existingContainer)); when(nodeRepository.getContainerNodeSpec(hostName)).thenReturn(Optional.of(nodeSpec)); - nodeAgent.tick(); + try { + nodeAgent.tick(); + fail("Expected to throw an exception"); + } catch (Exception e) { + } verify(orchestrator).suspend(hostName); verify(docker, never()).stopContainer(any(ContainerName.class)); @@ -898,8 +904,11 @@ public class NodeAgentImplTest { when(docker.getContainer(hostName)).thenReturn(Optional.of(existingContainer)); when(nodeRepository.getContainerNodeSpec(hostName)).thenReturn(Optional.of(nodeSpec1)); - nodeAgent.tick(); - + try { + nodeAgent.tick(); + fail("Expected to throw an exception"); + } catch (Exception e) { + } // Should get exactly one invocation. inOrder.verify(nodeRepository).updateNodeAttributes(hostName, restartGeneration, dockerImage1, vespaVersion); verify(nodeRepository, times(1)).updateNodeAttributes( @@ -951,8 +960,11 @@ public class NodeAgentImplTest { when(docker.getContainer(hostName)).thenReturn(NO_CONTAINER); when(nodeRepository.getContainerNodeSpec(hostName)).thenReturn(Optional.of(nodeSpec)); - nodeAgent.tick(); - + try { + nodeAgent.tick(); + fail("Expected to throw an exception"); + } catch (Exception e) { + } inOrder.verify(docker).startContainer( nodeSpec.wantedDockerImage.get(), nodeSpec.hostname, @@ -964,7 +976,12 @@ public class NodeAgentImplTest { inOrder.verifyNoMoreInteractions(); // 2nd try - nodeAgent.tick(); + try { + nodeAgent.tick(); + fail("Expected to throw an exception"); + } catch (Exception e) { + } + inOrder.verify(docker, times(2)).executeInContainer(any(), anyVararg()); inOrder.verifyNoMoreInteractions(); |