From 3455f876114c1220ecd04be9cc9e856597f55ea2 Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 28 Aug 2023 14:38:36 +0200 Subject: Issue rebuild commands in parallel, since lock is shared --- .../hosted/provision/maintenance/DiskReplacer.java | 56 +++++++++++++++------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DiskReplacer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DiskReplacer.java index 5baa4f63867..f53eae4ee2b 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DiskReplacer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/DiskReplacer.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.provision.maintenance; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.jdisc.Metric; import com.yahoo.vespa.hosted.provision.Node; import com.yahoo.vespa.hosted.provision.NodeList; @@ -8,9 +9,16 @@ import com.yahoo.vespa.hosted.provision.NodeMutex; import com.yahoo.vespa.hosted.provision.NodeRepository; import com.yahoo.vespa.hosted.provision.provisioning.HostProvisioner; import com.yahoo.yolean.Exceptions; +import com.yahoo.yolean.UncheckedInterruptedException; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; @@ -24,6 +32,7 @@ public class DiskReplacer extends NodeRepositoryMaintainer { private static final Logger log = Logger.getLogger(DiskReplacer.class.getName()); private final HostProvisioner hostProvisioner; + private final ExecutorService executor = Executors.newCachedThreadPool(new DaemonThreadFactory("disk-replacer")); DiskReplacer(NodeRepository nodeRepository, Duration interval, Metric metric, HostProvisioner hostProvisioner) { super(nodeRepository, interval, metric); @@ -33,26 +42,41 @@ public class DiskReplacer extends NodeRepositoryMaintainer { @Override protected double maintain() { NodeList nodes = nodeRepository().nodes().list().rebuilding(true); + int attempts = 0; int failures = 0; - for (var host : nodes) { - Optional optionalMutex = nodeRepository().nodes().lockAndGet(host, Duration.ofSeconds(10)); - if (optionalMutex.isEmpty()) continue; - try (NodeMutex mutex = optionalMutex.get()) { - // Re-check flag while holding lock - host = mutex.node(); - if (!host.status().wantToRebuild()) { - continue; + try (var locked = nodeRepository().nodes().lockAndGetAll(nodes.asList(), Optional.of(Duration.ofSeconds(10)))) { + Map> rebuilt = new HashMap<>(); + for (NodeMutex node : locked.nodes()) { + if (node.node().status().wantToRebuild()) { + ++attempts; + rebuilt.put(node.node().hostname(), executor.submit(() -> hostProvisioner.replaceRootDisk(node.node()))); } - Node updatedNode = hostProvisioner.replaceRootDisk(host); - if (!updatedNode.status().wantToRebuild()) { - nodeRepository().nodes().write(updatedNode, mutex); + } + + for (var node : rebuilt.entrySet()) { + try { + Node updated = node.getValue().get(); + if ( ! updated.status().wantToRebuild()) { + nodeRepository().nodes().write(updated, () -> { }); + } + } + catch (ExecutionException e) { + ++failures; + log.log(Level.WARNING, "Failed to rebuild " + node.getKey() + ", will retry in " + + interval() + ": " + Exceptions.toMessageString(e.getCause())); + } + catch (InterruptedException e) { + throw new UncheckedInterruptedException(e, true); } - } catch (RuntimeException e) { - failures++; - log.log(Level.WARNING, "Failed to rebuild " + host.hostname() + ", will retry in " + - interval() + ": " + Exceptions.toMessageString(e)); } } - return this.asSuccessFactorDeviation(nodes.size(), failures); + return this.asSuccessFactorDeviation(attempts, failures); } + + @Override + public void shutdown() { + super.shutdown(); + executor.shutdown(); + } + } -- cgit v1.2.3