diff options
author | Valerij Fredriksen <valerijf@oath.com> | 2018-08-30 00:02:28 +0200 |
---|---|---|
committer | Valerij Fredriksen <valerij92@gmail.com> | 2018-08-30 00:02:28 +0200 |
commit | 379c2c642df76f3114b51c4ffc5b52af0499fbad (patch) | |
tree | 0febf5ceef947e2c842645d03020d9e4dd60185f /node-admin/src/main | |
parent | da9446f3584675283992abd084d613cfbe593126 (diff) |
Simplify synchronizeNodesToNodeAgents()
Diffstat (limited to 'node-admin/src/main')
-rw-r--r-- | node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java | 95 |
1 files changed, 34 insertions, 61 deletions
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java index f6e05b92310..ecbd0ff8357 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/nodeadmin/NodeAdminImpl.java @@ -1,9 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.node.admin.nodeadmin; -import com.yahoo.collections.Pair; import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.vespa.hosted.dockerapi.ContainerName; +import com.yahoo.vespa.hosted.dockerapi.Container; import com.yahoo.vespa.hosted.dockerapi.metrics.CounterWrapper; import com.yahoo.vespa.hosted.dockerapi.metrics.Dimensions; import com.yahoo.vespa.hosted.dockerapi.metrics.GaugeWrapper; @@ -21,7 +20,6 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -29,7 +27,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Administers a host (for now only docker hosts) and its nodes (docker containers nodes). @@ -53,7 +50,7 @@ public class NodeAdminImpl implements NodeAdmin { private boolean isFrozen; private Instant startOfFreezeConvergence; - private final Map<ContainerName, NodeAgent> nodeAgents = new ConcurrentHashMap<>(); + private final Map<String, NodeAgent> nodeAgentsByHostname = new ConcurrentHashMap<>(); private final GaugeWrapper numberOfContainersInLoadImageState; private final CounterWrapper numberOfUnhandledExceptionsInNodeAgent; @@ -81,15 +78,12 @@ public class NodeAdminImpl implements NodeAdmin { @Override public void refreshContainersToRun(List<NodeSpec> containersToRun) { - final List<ContainerName> existingContainerNames = dockerOperations.getAllManagedContainers().stream() - .map(container -> container.name) - .collect(Collectors.toList()); - final List<String> hostnamesOfContainersToRun = containersToRun.stream() + final Set<String> hostnamesOfContainersToRun = containersToRun.stream() .map(NodeSpec::getHostname) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); storageMaintainer.cleanNodeAdmin(); - synchronizeNodesToNodeAgents(hostnamesOfContainersToRun, existingContainerNames); + synchronizeNodesToNodeAgents(hostnamesOfContainersToRun); dockerOperations.deleteUnusedDockerImages(); updateNodeAgentMetrics(); @@ -99,7 +93,7 @@ public class NodeAdminImpl implements NodeAdmin { int numberContainersWaitingImage = 0; int numberOfNewUnhandledExceptions = 0; - for (NodeAgent nodeAgent : nodeAgents.values()) { + for (NodeAgent nodeAgent : nodeAgentsByHostname.values()) { if (nodeAgent.isDownloadingImage()) numberContainersWaitingImage++; numberOfNewUnhandledExceptions += nodeAgent.getAndResetNumberOfUnhandledExceptions(); } @@ -121,7 +115,7 @@ public class NodeAdminImpl implements NodeAdmin { } // Use filter with count instead of allMatch() because allMatch() will short circuit on first non-match - boolean allNodeAgentsConverged = nodeAgents.values().stream() + boolean allNodeAgentsConverged = nodeAgentsByHostname.values().stream() .filter(nodeAgent -> !nodeAgent.setFrozen(wantFrozen)) .count() == 0; @@ -149,13 +143,13 @@ public class NodeAdminImpl implements NodeAdmin { @Override public void stopNodeAgentServices(List<String> hostnames) { // Each container may spend 1-1:30 minutes stopping - nodeAgents.values().parallelStream() + nodeAgentsByHostname.values().parallelStream() .filter(nodeAgent -> hostnames.contains(nodeAgent.getHostname())) .forEach(NodeAgent::stopServices); } public int getNumberOfNodeAgents() { - return nodeAgents.keySet().size(); + return nodeAgentsByHostname.keySet().size(); } @Override @@ -163,7 +157,7 @@ public class NodeAdminImpl implements NodeAdmin { Map<String, Object> debug = new LinkedHashMap<>(); debug.put("isFrozen", isFrozen); - List<Map<String, Object>> nodeAgentDebugs = nodeAgents.entrySet().stream() + List<Map<String, Object>> nodeAgentDebugs = nodeAgentsByHostname.entrySet().stream() .map(node -> node.getValue().debugInfo()).collect(Collectors.toList()); debug.put("NodeAgents", nodeAgentDebugs); return debug; @@ -173,7 +167,7 @@ public class NodeAdminImpl implements NodeAdmin { public void start() { metricsScheduler.scheduleAtFixedRate(() -> { try { - nodeAgents.values().forEach(NodeAgent::updateContainerNodeMetrics); + nodeAgentsByHostname.values().forEach(NodeAgent::updateContainerNodeMetrics); } catch (Throwable e) { logger.warning("Metric fetcher scheduler failed", e); } @@ -191,7 +185,7 @@ public class NodeAdminImpl implements NodeAdmin { aclScheduler.shutdown(); // Stop all node-agents in parallel, will block until the last NodeAgent is stopped - nodeAgents.values().parallelStream().forEach(NodeAgent::stop); + nodeAgentsByHostname.values().parallelStream().forEach(NodeAgent::stop); do { try { @@ -210,55 +204,34 @@ public class NodeAdminImpl implements NodeAdmin { return result; } - // Returns a full outer join of two data sources (of types T and U) on some extractable attribute (of type V). - // Full outer join means that all elements of both data sources are included in the result, - // even when there is no corresponding element (having the same attribute) in the other data set, - // in which case the value from the other source will be empty. - static <T, U, V> Stream<Pair<Optional<T>, Optional<U>>> fullOuterJoin( - final Stream<T> tStream, final Function<T, V> tAttributeExtractor, - final Stream<U> uStream, final Function<U, V> uAttributeExtractor) { - final Map<V, T> tMap = tStream.collect(Collectors.toMap(tAttributeExtractor, t -> t)); - final Map<V, U> uMap = uStream.collect(Collectors.toMap(uAttributeExtractor, u -> u)); - return Stream.concat(tMap.keySet().stream(), uMap.keySet().stream()) - .distinct() - .map(key -> new Pair<>(Optional.ofNullable(tMap.get(key)), Optional.ofNullable(uMap.get(key)))); - } + void synchronizeNodesToNodeAgents(Set<String> hostnamesToRun) { + Map<String, Container> runningContainersByHostname = dockerOperations.getAllManagedContainers().stream() + .collect(Collectors.toMap(c -> c.hostname, c -> c)); - // The method streams the list of containers twice. - void synchronizeNodesToNodeAgents( - final List<String> hostnamesToRun, - final List<ContainerName> existingContainers) { - final Map<ContainerName, String> hostnameByContainerName = hostnamesToRun.stream() - .collect(Collectors.toMap(ContainerName::fromHostname, i -> i)); - final Stream<Pair<Optional<ContainerName>, Optional<ContainerName>>> nodeSpecContainerPairs = fullOuterJoin( - hostnameByContainerName.keySet().stream(), containerName -> containerName, - existingContainers.stream(), containerName -> containerName); - - final Set<ContainerName> obsoleteAgentContainerNames = diff(nodeAgents.keySet(), hostnameByContainerName.keySet()); - obsoleteAgentContainerNames.forEach(containerName -> nodeAgents.remove(containerName).stop()); - - nodeSpecContainerPairs.forEach(nodeSpecContainerPair -> { - final Optional<ContainerName> nodeSpec = nodeSpecContainerPair.getFirst(); - final Optional<ContainerName> existingContainer = nodeSpecContainerPair.getSecond(); - - if (!nodeSpec.isPresent()) { - assert existingContainer.isPresent(); - logger.warning("Container " + existingContainer.get().asString() + " exists, but is not in node repository runlist"); - return; - } + // Stop and remove NodeAgents that should no longer be running + diff(nodeAgentsByHostname.keySet(), hostnamesToRun) + .forEach(hostname -> nodeAgentsByHostname.remove(hostname).stop()); + + // Start NodeAgent for hostnames that should be running, but aren't yet + diff(hostnamesToRun, nodeAgentsByHostname.keySet()) + .forEach(this::startNodeAgent); - ensureNodeAgentForNodeIsStarted(nodeSpec.get(), hostnameByContainerName.get(nodeSpec.get())); - }); + // Remove containers that are running, but have no NodeAgent managing it (and after the previous steps, + // these containers shouldn't be running, otherwise a NodeAgent would have been created) + diff(runningContainersByHostname.keySet(), nodeAgentsByHostname.keySet()) + .forEach(hostname -> dockerOperations.removeContainer(runningContainersByHostname.get(hostname))); } - private void ensureNodeAgentForNodeIsStarted(ContainerName containerName, String hostname) { - if (nodeAgents.containsKey(containerName)) { - return; - } + private void startNodeAgent(String hostname) { + if (nodeAgentsByHostname.containsKey(hostname)) + throw new IllegalArgumentException("Attempted to start NodeAgent for hostname " + hostname + + ", but one is already running!"); - final NodeAgent agent = nodeAgentFactory.apply(hostname); + NodeAgent agent = nodeAgentFactory.apply(hostname); agent.start(); - nodeAgents.put(containerName, agent); + nodeAgentsByHostname.put(hostname, agent); + + // Sleep between starting NodeAgents to reduce simultaneous load against docker daemon when node-admin starts try { Thread.sleep(1000); } catch (InterruptedException e) { |