summaryrefslogtreecommitdiffstats
path: root/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
diff options
context:
space:
mode:
authorValerij Fredriksen <valerijf@oath.com>2019-01-09 15:13:24 +0100
committerValerij Fredriksen <valerijf@oath.com>2019-01-09 18:50:56 +0100
commit0cf7ec5d521e1595289219fadf98db514b353c1c (patch)
tree9460c2972f24d6d78b9e7b8a27b8b7e360dfc7bd /node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
parentf724bcfa584c590ad079cd2614aa21acc835e285 (diff)
Use NodeAgentContextSupplier in NodeAgent
Diffstat (limited to 'node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java')
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java213
1 files changed, 66 insertions, 147 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
index 98975dddb56..ce1acea7d39 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeagent/NodeAgentImpl.java
@@ -17,20 +17,17 @@ import com.yahoo.vespa.hosted.dockerapi.metrics.Dimensions;
import com.yahoo.vespa.hosted.dockerapi.metrics.MetricReceiverWrapper;
import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeSpec;
import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeAttributes;
+import com.yahoo.vespa.hosted.node.admin.configserver.orchestrator.OrchestratorException;
import com.yahoo.vespa.hosted.node.admin.docker.DockerOperations;
import com.yahoo.vespa.hosted.node.admin.maintenance.StorageMaintainer;
import com.yahoo.vespa.hosted.node.admin.configserver.noderepository.NodeRepository;
import com.yahoo.vespa.hosted.node.admin.configserver.orchestrator.Orchestrator;
-import com.yahoo.vespa.hosted.node.admin.configserver.orchestrator.OrchestratorException;
import com.yahoo.vespa.hosted.node.admin.maintenance.acl.AclMaintainer;
import com.yahoo.vespa.hosted.node.admin.maintenance.identity.AthenzCredentialsMaintainer;
import com.yahoo.vespa.hosted.node.admin.nodeadmin.ConvergenceException;
import com.yahoo.vespa.hosted.node.admin.util.SecretAgentCheckConfig;
import com.yahoo.vespa.hosted.provision.Node;
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -59,30 +56,21 @@ public class NodeAgentImpl implements NodeAgent {
private static final Logger logger = Logger.getLogger(NodeAgentImpl.class.getName());
- private final Object monitor = new Object();
private final AtomicBoolean terminated = new AtomicBoolean(false);
-
- private boolean isFrozen = true;
- private boolean wantFrozen = false;
- private boolean workToDoNow = true;
- private boolean expectNodeNotInNodeRepo = false;
private boolean hasResumedNode = false;
private boolean hasStartedServices = true;
- private final NodeAgentContext context;
+ private final NodeAgentContextSupplier contextSupplier;
private final NodeRepository nodeRepository;
private final Orchestrator orchestrator;
private final DockerOperations dockerOperations;
private final StorageMaintainer storageMaintainer;
- private final Clock clock;
- private final Duration timeBetweenEachConverge;
private final Optional<AthenzCredentialsMaintainer> athenzCredentialsMaintainer;
private final Optional<AclMaintainer> aclMaintainer;
private final Optional<HealthChecker> healthChecker;
private int numberOfUnhandledException = 0;
private DockerImage imageBeingDownloaded = null;
- private Instant lastConverge;
private long currentRebootGeneration = 0;
private Optional<Long> currentRestartGeneration = Optional.empty();
@@ -115,24 +103,19 @@ public class NodeAgentImpl implements NodeAgent {
// Created in NodeAdminImpl
public NodeAgentImpl(
- final NodeAgentContext context,
+ final NodeAgentContextSupplier contextSupplier,
final NodeRepository nodeRepository,
final Orchestrator orchestrator,
final DockerOperations dockerOperations,
final StorageMaintainer storageMaintainer,
- final Clock clock,
- final Duration timeBetweenEachConverge,
final Optional<AthenzCredentialsMaintainer> athenzCredentialsMaintainer,
final Optional<AclMaintainer> aclMaintainer,
final Optional<HealthChecker> healthChecker) {
- this.context = context;
+ this.contextSupplier = contextSupplier;
this.nodeRepository = nodeRepository;
this.orchestrator = orchestrator;
this.dockerOperations = dockerOperations;
this.storageMaintainer = storageMaintainer;
- this.clock = clock;
- this.timeBetweenEachConverge = timeBetweenEachConverge;
- this.lastConverge = clock.instant();
this.athenzCredentialsMaintainer = athenzCredentialsMaintainer;
this.aclMaintainer = aclMaintainer;
this.healthChecker = healthChecker;
@@ -140,16 +123,15 @@ public class NodeAgentImpl implements NodeAgent {
this.loopThread = new Thread(() -> {
while (!terminated.get()) {
try {
- tick();
- } catch (Throwable t) {
- numberOfUnhandledException++;
- context.log(logger, LogLevel.ERROR, "Unhandled throwable, ignoring", t);
- }
+ NodeAgentContext context = contextSupplier.nextContext();
+ converge(context);
+ } catch (InterruptedException ignored) { }
}
});
- this.loopThread.setName("tick-" + context.hostname());
+ this.loopThread.setName("tick-" + contextSupplier.currentContext().hostname());
this.serviceRestarter = service -> {
+ NodeAgentContext context = contextSupplier.currentContext();
try {
ProcessResult processResult = dockerOperations.executeCommandInContainerAsRoot(
context, "service", service, "restart");
@@ -164,46 +146,29 @@ public class NodeAgentImpl implements NodeAgent {
}
@Override
- public boolean setFrozen(boolean frozen) {
- synchronized (monitor) {
- if (wantFrozen != frozen) {
- wantFrozen = frozen;
- context.log(logger, LogLevel.DEBUG, wantFrozen ? "Freezing" : "Unfreezing");
- signalWorkToBeDone();
- }
-
- return isFrozen == frozen;
- }
- }
-
- @Override
public void start() {
- context.log(logger, "Starting with interval " + timeBetweenEachConverge.toMillis() + " ms");
loopThread.start();
}
@Override
public void stop() {
- filebeatRestarter.shutdown();
if (!terminated.compareAndSet(false, true)) {
throw new RuntimeException("Can not re-stop a node agent.");
}
- signalWorkToBeDone();
+ filebeatRestarter.shutdown();
+ contextSupplier.interrupt();
do {
try {
loopThread.join();
filebeatRestarter.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- } catch (InterruptedException e) {
- context.log(logger, LogLevel.ERROR,
- "Interrupted while waiting for converge thread and filebeatRestarter scheduler to shutdown");
- }
+ } catch (InterruptedException ignored) { }
} while (loopThread.isAlive() || !filebeatRestarter.isTerminated());
- context.log(logger, "Stopped");
+ contextSupplier.currentContext().log(logger, "Stopped");
}
- void startServicesIfNeeded() {
+ void startServicesIfNeeded(NodeAgentContext context) {
if (!hasStartedServices) {
context.log(logger, "Starting services");
dockerOperations.startServices(context);
@@ -211,7 +176,7 @@ public class NodeAgentImpl implements NodeAgent {
}
}
- void resumeNodeIfNeeded(NodeSpec node) {
+ void resumeNodeIfNeeded(NodeAgentContext context, NodeSpec node) {
if (!hasResumedNode) {
if (!currentFilebeatRestarter.isPresent()) {
storageMaintainer.writeMetricsConfig(context, node);
@@ -225,7 +190,7 @@ public class NodeAgentImpl implements NodeAgent {
}
}
- private void updateNodeRepoWithCurrentAttributes(final NodeSpec node) {
+ private void updateNodeRepoWithCurrentAttributes(NodeAgentContext context, NodeSpec node) {
final NodeAttributes currentNodeAttributes = new NodeAttributes();
final NodeAttributes newNodeAttributes = new NodeAttributes();
@@ -246,10 +211,10 @@ public class NodeAgentImpl implements NodeAgent {
newNodeAttributes.withDockerImage(actualDockerImage.orElse(new DockerImage("")));
}
- publishStateToNodeRepoIfChanged(currentNodeAttributes, newNodeAttributes);
+ publishStateToNodeRepoIfChanged(context, currentNodeAttributes, newNodeAttributes);
}
- private void publishStateToNodeRepoIfChanged(NodeAttributes currentAttributes, NodeAttributes newAttributes) {
+ private void publishStateToNodeRepoIfChanged(NodeAgentContext context, NodeAttributes currentAttributes, NodeAttributes newAttributes) {
if (!currentAttributes.equals(newAttributes)) {
context.log(logger, "Publishing new set of attributes to node repo: %s -> %s",
currentAttributes, newAttributes);
@@ -257,7 +222,7 @@ public class NodeAgentImpl implements NodeAgent {
}
}
- private void startContainer(NodeSpec node) {
+ private void startContainer(NodeAgentContext context, NodeSpec node) {
ContainerData containerData = createContainerData(context, node);
dockerOperations.createContainer(context, node, containerData);
dockerOperations.startContainer(context);
@@ -268,13 +233,14 @@ public class NodeAgentImpl implements NodeAgent {
context.log(logger, "Container successfully started, new containerState is " + containerState);
}
- private Optional<Container> removeContainerIfNeededUpdateContainerState(NodeSpec node, Optional<Container> existingContainer) {
+ private Optional<Container> removeContainerIfNeededUpdateContainerState(
+ NodeAgentContext context, NodeSpec node, Optional<Container> existingContainer) {
return existingContainer
- .flatMap(container -> removeContainerIfNeeded(node, container))
+ .flatMap(container -> removeContainerIfNeeded(context, node, container))
.map(container -> {
shouldRestartServices(node).ifPresent(restartReason -> {
context.log(logger, "Will restart services: " + restartReason);
- restartServices(node, container);
+ restartServices(context, node, container);
currentRestartGeneration = node.getWantedRestartGeneration();
});
return container;
@@ -292,17 +258,18 @@ public class NodeAgentImpl implements NodeAgent {
return Optional.empty();
}
- private void restartServices(NodeSpec node, Container existingContainer) {
+ private void restartServices(NodeAgentContext context, NodeSpec node, Container existingContainer) {
if (existingContainer.state.isRunning() && node.getState() == Node.State.active) {
context.log(logger, "Restarting services");
// Since we are restarting the services we need to suspend the node.
- orchestratorSuspendNode();
+ orchestratorSuspendNode(context);
dockerOperations.restartVespa(context);
}
}
@Override
public void stopServices() {
+ NodeAgentContext context = contextSupplier.currentContext();
context.log(logger, "Stopping services");
if (containerState == ABSENT) return;
try {
@@ -315,6 +282,7 @@ public class NodeAgentImpl implements NodeAgent {
@Override
public void suspend() {
+ NodeAgentContext context = contextSupplier.currentContext();
context.log(logger, "Suspending services on node");
if (containerState == ABSENT) return;
try {
@@ -358,14 +326,14 @@ public class NodeAgentImpl implements NodeAgent {
return Optional.empty();
}
- private Optional<Container> removeContainerIfNeeded(NodeSpec node, Container existingContainer) {
+ private Optional<Container> removeContainerIfNeeded(NodeAgentContext context, NodeSpec node, Container existingContainer) {
Optional<String> removeReason = shouldRemoveContainer(node, existingContainer);
if (removeReason.isPresent()) {
context.log(logger, "Will remove container: " + removeReason.get());
if (existingContainer.state.isRunning()) {
if (node.getState() == Node.State.active) {
- orchestratorSuspendNode();
+ orchestratorSuspendNode(context);
}
try {
@@ -399,78 +367,29 @@ public class NodeAgentImpl implements NodeAgent {
}
}
- private void signalWorkToBeDone() {
- synchronized (monitor) {
- if (!workToDoNow) {
- workToDoNow = true;
- context.log(logger, LogLevel.DEBUG, "Signaling work to be done");
- monitor.notifyAll();
- }
- }
- }
-
- void tick() {
- boolean isFrozenCopy;
- synchronized (monitor) {
- while (!workToDoNow) {
- long remainder = timeBetweenEachConverge
- .minus(Duration.between(lastConverge, clock.instant()))
- .toMillis();
- if (remainder > 0) {
- try {
- monitor.wait(remainder);
- } catch (InterruptedException e) {
- context.log(logger, LogLevel.ERROR, "Interrupted while sleeping before tick, ignoring");
- }
- } else break;
- }
- lastConverge = clock.instant();
- workToDoNow = false;
-
- if (isFrozen != wantFrozen) {
- isFrozen = wantFrozen;
- context.log(logger, "Updated NodeAgent's frozen state, new value: isFrozen: " + isFrozen);
- }
- isFrozenCopy = isFrozen;
- }
-
- if (isFrozenCopy) {
- context.log(logger, LogLevel.DEBUG, "tick: isFrozen");
- } else {
- try {
- converge();
- } catch (OrchestratorException | ConvergenceException e) {
- context.log(logger, e.getMessage());
- } catch (ContainerNotFoundException e) {
- containerState = ABSENT;
- context.log(logger, LogLevel.WARNING, "Container unexpectedly gone, resetting containerState to " + containerState);
- } catch (DockerException e) {
- numberOfUnhandledException++;
- context.log(logger, LogLevel.ERROR, "Caught a DockerException", e);
- } catch (Exception e) {
- numberOfUnhandledException++;
- context.log(logger, LogLevel.ERROR, "Unhandled exception, ignoring.", e);
- }
+ public void converge(NodeAgentContext context) {
+ try {
+ doConverge(context);
+ } catch (OrchestratorException | ConvergenceException e) {
+ context.log(logger, e.getMessage());
+ } catch (ContainerNotFoundException e) {
+ containerState = ABSENT;
+ context.log(logger, LogLevel.WARNING, "Container unexpectedly gone, resetting containerState to " + containerState);
+ } catch (DockerException e) {
+ numberOfUnhandledException++;
+ context.log(logger, LogLevel.ERROR, "Caught a DockerException", e);
+ } catch (Throwable e) {
+ numberOfUnhandledException++;
+ context.log(logger, LogLevel.ERROR, "Unhandled exception, ignoring", e);
}
}
// Public for testing
- void converge() {
- final Optional<NodeSpec> optionalNode = nodeRepository.getOptionalNode(context.hostname().value());
-
- // We just removed the node from node repo, so this is expected until NodeAdmin stop this NodeAgent
- if (!optionalNode.isPresent() && expectNodeNotInNodeRepo) {
- context.log(logger, LogLevel.INFO, "Node removed from node repo (as expected)");
- return;
- }
-
- final NodeSpec node = optionalNode.orElseThrow(() ->
- new IllegalStateException(String.format("Node '%s' missing from node repository", context.hostname())));
- expectNodeNotInNodeRepo = false;
-
- Optional<Container> container = getContainer();
+ void doConverge(NodeAgentContext context) {
+ NodeSpec node = context.node();
+ Optional<Container> container = getContainer(context);
if (!node.equals(lastNode)) {
- logChangesToNodeSpec(lastNode, node);
+ logChangesToNodeSpec(context, lastNode, node);
// Current reboot generation uninitialized or incremented from outside to cancel reboot
if (currentRebootGeneration < node.getCurrentRebootGeneration())
@@ -496,8 +415,8 @@ public class NodeAgentImpl implements NodeAgent {
case reserved:
case parked:
case failed:
- removeContainerIfNeededUpdateContainerState(node, container);
- updateNodeRepoWithCurrentAttributes(node);
+ removeContainerIfNeededUpdateContainerState(context, node, container);
+ updateNodeRepoWithCurrentAttributes(context, node);
break;
case active:
storageMaintainer.handleCoreDumpsForContainer(context, node, container);
@@ -512,17 +431,17 @@ public class NodeAgentImpl implements NodeAgent {
context.log(logger, LogLevel.DEBUG, "Waiting for image to download " + imageBeingDownloaded.asString());
return;
}
- container = removeContainerIfNeededUpdateContainerState(node, container);
+ container = removeContainerIfNeededUpdateContainerState(context, node, container);
athenzCredentialsMaintainer.ifPresent(maintainer -> maintainer.converge(context));
if (! container.isPresent()) {
containerState = STARTING;
- startContainer(node);
+ startContainer(context, node);
containerState = UNKNOWN;
aclMaintainer.ifPresent(AclMaintainer::converge);
}
- startServicesIfNeeded();
- resumeNodeIfNeeded(node);
+ startServicesIfNeeded(context);
+ resumeNodeIfNeeded(context, node);
healthChecker.ifPresent(checker -> checker.verifyHealth(context));
// Because it's more important to stop a bad release from rolling out in prod,
@@ -535,32 +454,31 @@ public class NodeAgentImpl implements NodeAgent {
// has been successfully rolled out.
// - Slobrok and internal orchestrator state is used to determine whether
// to allow upgrade (suspend).
- updateNodeRepoWithCurrentAttributes(node);
+ updateNodeRepoWithCurrentAttributes(context, node);
context.log(logger, "Call resume against Orchestrator");
orchestrator.resume(context.hostname().value());
break;
case inactive:
- removeContainerIfNeededUpdateContainerState(node, container);
- updateNodeRepoWithCurrentAttributes(node);
+ removeContainerIfNeededUpdateContainerState(context, node, container);
+ updateNodeRepoWithCurrentAttributes(context, node);
break;
case provisioned:
nodeRepository.setNodeState(context.hostname().value(), Node.State.dirty);
break;
case dirty:
- removeContainerIfNeededUpdateContainerState(node, container);
+ removeContainerIfNeededUpdateContainerState(context, node, container);
context.log(logger, "State is " + node.getState() + ", will delete application storage and mark node as ready");
athenzCredentialsMaintainer.ifPresent(maintainer -> maintainer.clearCredentials(context));
storageMaintainer.archiveNodeStorage(context);
- updateNodeRepoWithCurrentAttributes(node);
+ updateNodeRepoWithCurrentAttributes(context, node);
nodeRepository.setNodeState(context.hostname().value(), Node.State.ready);
- expectNodeNotInNodeRepo = true;
break;
default:
throw new RuntimeException("UNKNOWN STATE " + node.getState().name());
}
}
- private void logChangesToNodeSpec(NodeSpec lastNode, NodeSpec node) {
+ private void logChangesToNodeSpec(NodeAgentContext context, NodeSpec lastNode, NodeSpec node) {
StringBuilder builder = new StringBuilder();
appendIfDifferent(builder, "state", lastNode, node, NodeSpec::getState);
if (builder.length() > 0) {
@@ -592,8 +510,9 @@ public class NodeAgentImpl implements NodeAgent {
@SuppressWarnings("unchecked")
public void updateContainerNodeMetrics() {
- final NodeSpec node = lastNode;
- if (node == null || containerState != UNKNOWN) return;
+ if (containerState != UNKNOWN) return;
+ final NodeAgentContext context = contextSupplier.currentContext();
+ final NodeSpec node = context.node();
Optional<ContainerStats> containerStats = dockerOperations.getContainerStats(context);
if (!containerStats.isPresent()) return;
@@ -660,10 +579,10 @@ public class NodeAgentImpl implements NodeAgent {
metrics.add(networkMetrics);
});
- pushMetricsToContainer(metrics);
+ pushMetricsToContainer(context, metrics);
}
- private void pushMetricsToContainer(List<DimensionMetrics> metrics) {
+ private void pushMetricsToContainer(NodeAgentContext context, List<DimensionMetrics> metrics) {
StringBuilder params = new StringBuilder();
try {
for (DimensionMetrics dimensionMetrics : metrics) {
@@ -679,7 +598,7 @@ public class NodeAgentImpl implements NodeAgent {
}
}
- private Optional<Container> getContainer() {
+ private Optional<Container> getContainer(NodeAgentContext context) {
if (containerState == ABSENT) return Optional.empty();
Optional<Container> container = dockerOperations.getContainer(context);
if (! container.isPresent()) containerState = ABSENT;
@@ -743,7 +662,7 @@ public class NodeAgentImpl implements NodeAgent {
// More generally, the node repo response should contain sufficient info on what the docker image is,
// to allow the node admin to make decisions that depend on the docker image. Or, each docker image
// needs to contain routines for drain and suspend. For many images, these can just be dummy routines.
- private void orchestratorSuspendNode() {
+ private void orchestratorSuspendNode(NodeAgentContext context) {
context.log(logger, "Ask Orchestrator for permission to suspend node");
orchestrator.suspend(context.hostname().value());
}