summaryrefslogtreecommitdiffstats
path: root/node-admin/src/main
diff options
context:
space:
mode:
authorValerij Fredriksen <valerijf@oath.com>2018-08-30 00:02:28 +0200
committerValerij Fredriksen <valerij92@gmail.com>2018-08-30 00:02:28 +0200
commit379c2c642df76f3114b51c4ffc5b52af0499fbad (patch)
tree0febf5ceef947e2c842645d03020d9e4dd60185f /node-admin/src/main
parentda9446f3584675283992abd084d613cfbe593126 (diff)
Simplify synchronizeNodesToNodeAgents()
Diffstat (limited to 'node-admin/src/main')
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java95
1 files changed, 34 insertions, 61 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java
index f6e05b92310..ecbd0ff8357 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java
@@ -1,9 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.node.admin.nodeadmin;
-import com.yahoo.collections.Pair;
import com.yahoo.concurrent.ThreadFactoryFactory;
-import com.yahoo.vespa.hosted.dockerapi.ContainerName;
+import com.yahoo.vespa.hosted.dockerapi.Container;
import com.yahoo.vespa.hosted.dockerapi.metrics.CounterWrapper;
import com.yahoo.vespa.hosted.dockerapi.metrics.Dimensions;
import com.yahoo.vespa.hosted.dockerapi.metrics.GaugeWrapper;
@@ -21,7 +20,6 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -29,7 +27,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* Administers a host (for now only docker hosts) and its nodes (docker containers nodes).
@@ -53,7 +50,7 @@ public class NodeAdminImpl implements NodeAdmin {
private boolean isFrozen;
private Instant startOfFreezeConvergence;
- private final Map<ContainerName, NodeAgent> nodeAgents = new ConcurrentHashMap<>();
+ private final Map<String, NodeAgent> nodeAgentsByHostname = new ConcurrentHashMap<>();
private final GaugeWrapper numberOfContainersInLoadImageState;
private final CounterWrapper numberOfUnhandledExceptionsInNodeAgent;
@@ -81,15 +78,12 @@ public class NodeAdminImpl implements NodeAdmin {
@Override
public void refreshContainersToRun(List<NodeSpec> containersToRun) {
- final List<ContainerName> existingContainerNames = dockerOperations.getAllManagedContainers().stream()
- .map(container -> container.name)
- .collect(Collectors.toList());
- final List<String> hostnamesOfContainersToRun = containersToRun.stream()
+ final Set<String> hostnamesOfContainersToRun = containersToRun.stream()
.map(NodeSpec::getHostname)
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
storageMaintainer.cleanNodeAdmin();
- synchronizeNodesToNodeAgents(hostnamesOfContainersToRun, existingContainerNames);
+ synchronizeNodesToNodeAgents(hostnamesOfContainersToRun);
dockerOperations.deleteUnusedDockerImages();
updateNodeAgentMetrics();
@@ -99,7 +93,7 @@ public class NodeAdminImpl implements NodeAdmin {
int numberContainersWaitingImage = 0;
int numberOfNewUnhandledExceptions = 0;
- for (NodeAgent nodeAgent : nodeAgents.values()) {
+ for (NodeAgent nodeAgent : nodeAgentsByHostname.values()) {
if (nodeAgent.isDownloadingImage()) numberContainersWaitingImage++;
numberOfNewUnhandledExceptions += nodeAgent.getAndResetNumberOfUnhandledExceptions();
}
@@ -121,7 +115,7 @@ public class NodeAdminImpl implements NodeAdmin {
}
// Use filter with count instead of allMatch() because allMatch() will short circuit on first non-match
- boolean allNodeAgentsConverged = nodeAgents.values().stream()
+ boolean allNodeAgentsConverged = nodeAgentsByHostname.values().stream()
.filter(nodeAgent -> !nodeAgent.setFrozen(wantFrozen))
.count() == 0;
@@ -149,13 +143,13 @@ public class NodeAdminImpl implements NodeAdmin {
@Override
public void stopNodeAgentServices(List<String> hostnames) {
// Each container may spend 1-1:30 minutes stopping
- nodeAgents.values().parallelStream()
+ nodeAgentsByHostname.values().parallelStream()
.filter(nodeAgent -> hostnames.contains(nodeAgent.getHostname()))
.forEach(NodeAgent::stopServices);
}
public int getNumberOfNodeAgents() {
- return nodeAgents.keySet().size();
+ return nodeAgentsByHostname.keySet().size();
}
@Override
@@ -163,7 +157,7 @@ public class NodeAdminImpl implements NodeAdmin {
Map<String, Object> debug = new LinkedHashMap<>();
debug.put("isFrozen", isFrozen);
- List<Map<String, Object>> nodeAgentDebugs = nodeAgents.entrySet().stream()
+ List<Map<String, Object>> nodeAgentDebugs = nodeAgentsByHostname.entrySet().stream()
.map(node -> node.getValue().debugInfo()).collect(Collectors.toList());
debug.put("NodeAgents", nodeAgentDebugs);
return debug;
@@ -173,7 +167,7 @@ public class NodeAdminImpl implements NodeAdmin {
public void start() {
metricsScheduler.scheduleAtFixedRate(() -> {
try {
- nodeAgents.values().forEach(NodeAgent::updateContainerNodeMetrics);
+ nodeAgentsByHostname.values().forEach(NodeAgent::updateContainerNodeMetrics);
} catch (Throwable e) {
logger.warning("Metric fetcher scheduler failed", e);
}
@@ -191,7 +185,7 @@ public class NodeAdminImpl implements NodeAdmin {
aclScheduler.shutdown();
// Stop all node-agents in parallel, will block until the last NodeAgent is stopped
- nodeAgents.values().parallelStream().forEach(NodeAgent::stop);
+ nodeAgentsByHostname.values().parallelStream().forEach(NodeAgent::stop);
do {
try {
@@ -210,55 +204,34 @@ public class NodeAdminImpl implements NodeAdmin {
return result;
}
- // Returns a full outer join of two data sources (of types T and U) on some extractable attribute (of type V).
- // Full outer join means that all elements of both data sources are included in the result,
- // even when there is no corresponding element (having the same attribute) in the other data set,
- // in which case the value from the other source will be empty.
- static <T, U, V> Stream<Pair<Optional<T>, Optional<U>>> fullOuterJoin(
- final Stream<T> tStream, final Function<T, V> tAttributeExtractor,
- final Stream<U> uStream, final Function<U, V> uAttributeExtractor) {
- final Map<V, T> tMap = tStream.collect(Collectors.toMap(tAttributeExtractor, t -> t));
- final Map<V, U> uMap = uStream.collect(Collectors.toMap(uAttributeExtractor, u -> u));
- return Stream.concat(tMap.keySet().stream(), uMap.keySet().stream())
- .distinct()
- .map(key -> new Pair<>(Optional.ofNullable(tMap.get(key)), Optional.ofNullable(uMap.get(key))));
- }
+ void synchronizeNodesToNodeAgents(Set<String> hostnamesToRun) {
+ Map<String, Container> runningContainersByHostname = dockerOperations.getAllManagedContainers().stream()
+ .collect(Collectors.toMap(c -> c.hostname, c -> c));
- // The method streams the list of containers twice.
- void synchronizeNodesToNodeAgents(
- final List<String> hostnamesToRun,
- final List<ContainerName> existingContainers) {
- final Map<ContainerName, String> hostnameByContainerName = hostnamesToRun.stream()
- .collect(Collectors.toMap(ContainerName::fromHostname, i -> i));
- final Stream<Pair<Optional<ContainerName>, Optional<ContainerName>>> nodeSpecContainerPairs = fullOuterJoin(
- hostnameByContainerName.keySet().stream(), containerName -> containerName,
- existingContainers.stream(), containerName -> containerName);
-
- final Set<ContainerName> obsoleteAgentContainerNames = diff(nodeAgents.keySet(), hostnameByContainerName.keySet());
- obsoleteAgentContainerNames.forEach(containerName -> nodeAgents.remove(containerName).stop());
-
- nodeSpecContainerPairs.forEach(nodeSpecContainerPair -> {
- final Optional<ContainerName> nodeSpec = nodeSpecContainerPair.getFirst();
- final Optional<ContainerName> existingContainer = nodeSpecContainerPair.getSecond();
-
- if (!nodeSpec.isPresent()) {
- assert existingContainer.isPresent();
- logger.warning("Container " + existingContainer.get().asString() + " exists, but is not in node repository runlist");
- return;
- }
+ // Stop and remove NodeAgents that should no longer be running
+ diff(nodeAgentsByHostname.keySet(), hostnamesToRun)
+ .forEach(hostname -> nodeAgentsByHostname.remove(hostname).stop());
+
+ // Start NodeAgent for hostnames that should be running, but aren't yet
+ diff(hostnamesToRun, nodeAgentsByHostname.keySet())
+ .forEach(this::startNodeAgent);
- ensureNodeAgentForNodeIsStarted(nodeSpec.get(), hostnameByContainerName.get(nodeSpec.get()));
- });
+ // Remove containers that are running, but have no NodeAgent managing it (and after the previous steps,
+ // these containers shouldn't be running, otherwise a NodeAgent would have been created)
+ diff(runningContainersByHostname.keySet(), nodeAgentsByHostname.keySet())
+ .forEach(hostname -> dockerOperations.removeContainer(runningContainersByHostname.get(hostname)));
}
- private void ensureNodeAgentForNodeIsStarted(ContainerName containerName, String hostname) {
- if (nodeAgents.containsKey(containerName)) {
- return;
- }
+ private void startNodeAgent(String hostname) {
+ if (nodeAgentsByHostname.containsKey(hostname))
+ throw new IllegalArgumentException("Attempted to start NodeAgent for hostname " + hostname +
+ ", but one is already running!");
- final NodeAgent agent = nodeAgentFactory.apply(hostname);
+ NodeAgent agent = nodeAgentFactory.apply(hostname);
agent.start();
- nodeAgents.put(containerName, agent);
+ nodeAgentsByHostname.put(hostname, agent);
+
+ // Sleep between starting NodeAgents to reduce simultaneous load against docker daemon when node-admin starts
try {
Thread.sleep(1000);
} catch (InterruptedException e) {