diff options
author | Martin Polden <mpolden@mpolden.no> | 2019-05-15 11:02:22 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2019-05-15 14:41:23 +0200 |
commit | fba175912a3743f2feb0c37957a3637996102030 (patch) | |
tree | 5627133b722d926f0cfb8119d4a6c6f2077164df /node-repository | |
parent | 018b97d17a6f47a9105df1a5e61ba10413f3d740 (diff) |
Require lock reference for all write operations
Diffstat (limited to 'node-repository')
12 files changed, 42 insertions, 38 deletions
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeRepository.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeRepository.java index 16105181ab3..ce6ec8fd7fe 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeRepository.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeRepository.java @@ -46,7 +46,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; -import java.util.function.UnaryOperator; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -399,7 +399,7 @@ public class NodeRepository extends AbstractComponent { List<Node> removableNodes = nodes.stream().map(node -> node.with(node.allocation().get().removable())) .collect(Collectors.toList()); - write(removableNodes); + write(removableNodes, lock); } } @@ -423,7 +423,7 @@ public class NodeRepository extends AbstractComponent { /** Move nodes to the dirty state */ public List<Node> setDirty(List<Node> nodes, Agent agent, String reason) { - return performOn(NodeListFilter.from(nodes), node -> setDirty(node, agent, reason)); + return performOn(NodeListFilter.from(nodes), (node, lock) -> setDirty(node, agent, reason)); } /** @@ -642,7 +642,7 @@ public class NodeRepository extends AbstractComponent { * Returns the nodes in their new state. */ public List<Node> restart(NodeFilter filter) { - return performOn(StateFilter.from(Node.State.active, filter), node -> write(node.withRestart(node.allocation().get().restartGeneration().withIncreasedWanted()))); + return performOn(StateFilter.from(Node.State.active, filter), (node, lock) -> write(node.withRestart(node.allocation().get().restartGeneration().withIncreasedWanted()), lock)); } /** @@ -650,24 +650,28 @@ public class NodeRepository extends AbstractComponent { * Returns the nodes in their new state. */ public List<Node> reboot(NodeFilter filter) { - return performOn(filter, node -> write(node.withReboot(node.status().reboot().withIncreasedWanted()))); + return performOn(filter, (node, lock) -> write(node.withReboot(node.status().reboot().withIncreasedWanted()), lock)); } /** * Writes this node after it has changed some internal state but NOT changed its state field. - * This does NOT lock the node repository. + * This does NOT lock the node repository implicitly, but callers are expected to already hold the lock. * + * @param lock Already acquired lock * @return the written node for convenience */ - public Node write(Node node) { return db.writeTo(node.state(), node, Agent.system, Optional.empty()); } + public Node write(Node node, Mutex lock) { return write(List.of(node), lock).get(0); } /** * Writes these nodes after they have changed some internal state but NOT changed their state field. - * This does NOT lock the node repository. + * This does NOT lock the node repository implicitly, but callers are expected to already hold the lock. * + * @param lock Already acquired lock * @return the written nodes for convenience */ - public List<Node> write(List<Node> nodes) { return db.writeTo(nodes, Agent.system, Optional.empty()); } + public List<Node> write(List<Node> nodes, @SuppressWarnings("unused") Mutex lock) { + return db.writeTo(nodes, Agent.system, Optional.empty()); + } /** * Performs an operation requiring locking on all nodes matching some filter. @@ -676,7 +680,7 @@ public class NodeRepository extends AbstractComponent { * @param action the action to perform * @return the set of nodes on which the action was performed, as they became as a result of the operation */ - private List<Node> performOn(NodeFilter filter, UnaryOperator<Node> action) { + private List<Node> performOn(NodeFilter filter, BiFunction<Node, Mutex, Node> action) { List<Node> unallocatedNodes = new ArrayList<>(); ListMap<ApplicationId, Node> allocatedNodes = new ListMap<>(); @@ -693,12 +697,12 @@ public class NodeRepository extends AbstractComponent { List<Node> resultingNodes = new ArrayList<>(); try (Mutex lock = lockAllocation()) { for (Node node : unallocatedNodes) - resultingNodes.add(action.apply(node)); + resultingNodes.add(action.apply(node, lock)); } for (Map.Entry<ApplicationId, List<Node>> applicationNodes : allocatedNodes.entrySet()) { try (Mutex lock = lock(applicationNodes.getKey())) { for (Node node : applicationNodes.getValue()) - resultingNodes.add(action.apply(node)); + resultingNodes.add(action.apply(node, lock)); } } return resultingNodes; diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostProvisionMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostProvisionMaintainer.java index a21d753b4cd..ea2707dcd0b 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostProvisionMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/HostProvisionMaintainer.java @@ -50,7 +50,7 @@ public class HostProvisionMaintainer extends Maintainer { candidates(nodes).forEach((host, children) -> { try { List<Node> updatedNodes = hostProvisioner.provision(host, children); - nodeRepository().write(updatedNodes); + nodeRepository().write(updatedNodes, lock); } catch (IllegalArgumentException | IllegalStateException e) { log.log(Level.INFO, "Failed to provision " + host.hostname() + ": " + Exceptions.toMessageString(e)); } catch (FatalProvisioningException e) { diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailer.java index f0d9f9f314f..8102f5cc305 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailer.java @@ -95,7 +95,7 @@ public class NodeFailer extends Maintainer { // Ready nodes try (Mutex lock = nodeRepository().lockAllocation()) { - updateNodeLivenessEventsForReadyNodes(); + updateNodeLivenessEventsForReadyNodes(lock); for (Map.Entry<Node, String> entry : getReadyNodesByFailureReason().entrySet()) { Node node = entry.getKey(); @@ -129,7 +129,7 @@ public class NodeFailer extends Maintainer { metric.set(throttledNodeFailuresMetric, throttledNodeFailures, null); } - private void updateNodeLivenessEventsForReadyNodes() { + private void updateNodeLivenessEventsForReadyNodes(Mutex lock) { // Update node last request events through ZooKeeper to collect request to all config servers. // We do this here ("lazily") to avoid writing to zk for each config request. for (Node node : nodeRepository().getNodes(Node.State.ready)) { @@ -139,7 +139,7 @@ public class NodeFailer extends Maintainer { if (! node.history().hasEventAfter(History.Event.Type.requested, lastLocalRequest.get())) { History updatedHistory = node.history() .with(new History.Event(History.Event.Type.requested, Agent.system, lastLocalRequest.get())); - nodeRepository().write(node.with(updatedHistory)); + nodeRepository().write(node.with(updatedHistory), lock); } } } @@ -329,7 +329,7 @@ public class NodeFailer extends Maintainer { try (Mutex lock = nodeRepository().lock(node.allocation().get().owner())) { node = nodeRepository().getNode(node.hostname(), Node.State.active).get(); // re-get inside lock - return nodeRepository().write(node.downAt(clock.instant())); + return nodeRepository().write(node.downAt(clock.instant()), lock); } } @@ -338,7 +338,7 @@ public class NodeFailer extends Maintainer { try (Mutex lock = nodeRepository().lock(node.allocation().get().owner())) { node = nodeRepository().getNode(node.hostname(), Node.State.active).get(); // re-get inside lock - nodeRepository().write(node.up()); + nodeRepository().write(node.up(), lock); } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirer.java index 89b180744f1..0245f2a92a3 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirer.java @@ -86,7 +86,7 @@ public class NodeRetirer extends Maintainer { if (! flavorSpareChecker.canRetireUnallocatedNodeWithFlavor(nodeToRetire.flavor())) break; retirementPolicy.shouldRetire(nodeToRetire).ifPresent(reason -> { - nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToDeprovision(true))); + nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToDeprovision(true)), lock); nodeRepository().park(nodeToRetire.hostname(), false, Agent.NodeRetirer, reason); iter.remove(); }); @@ -160,7 +160,7 @@ public class NodeRetirer extends Maintainer { Node updatedNode = node.with(node.status() .withWantToRetire(true) .withWantToDeprovision(true)); - nodeRepository().write(updatedNode); + nodeRepository().write(updatedNode, lock); })); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java index d96e62ec730..74e4bd7b4d2 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/restapi/v2/NodesApiHandler.java @@ -137,7 +137,7 @@ public class NodesApiHandler extends LoggingRequestHandler { if (path.startsWith("/nodes/v2/node/")) { Node node = nodeFromRequest(request); try (var lock = nodeRepository.lock(node)) { - nodeRepository.write(new NodePatcher(nodeFlavors, request.getData(), node, nodeRepository).apply()); + nodeRepository.write(new NodePatcher(nodeFlavors, request.getData(), node, nodeRepository).apply(), lock); } return new MessageResponse("Updated " + node.hostname()); } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/FailedExpirerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/FailedExpirerTest.java index 23b71cc8b6e..0da799eed30 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/FailedExpirerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/FailedExpirerTest.java @@ -292,7 +292,7 @@ public class FailedExpirerTest { public FailureScenario failNode(int times, String... hostname) { Stream.of(hostname).forEach(h -> { Node node = get(h); - nodeRepository.write(node.with(node.status().setFailCount(times))); + nodeRepository.write(node.with(node.status().setFailCount(times)), () -> {}); nodeRepository.fail(h, Agent.system, "Failed by unit test"); }); return this; @@ -302,7 +302,7 @@ public class FailedExpirerTest { Stream.of(hostname).forEach(h -> { Node node = get(h); nodeRepository.write(node.with(node.status().withHardwareFailureDescription( - Optional.of("memory_mcelog")))); + Optional.of("memory_mcelog"))), () -> {}); nodeRepository.fail(h, Agent.system, "Failed by unit test"); }); return this; diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailerTest.java index 1d988b6283a..07e5db305b6 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeFailerTest.java @@ -49,7 +49,7 @@ public class NodeFailerTest { .orElse(node.hostname().equals(hostWithHwFailure))) .forEach(node -> { Node updatedNode = node.with(node.status().withHardwareFailureDescription(Optional.of("HW failure"))); - tester.nodeRepository.write(updatedNode); + tester.nodeRepository.write(updatedNode, () -> {}); }); testNodeFailingWith(tester, hostWithHwFailure); @@ -66,7 +66,7 @@ public class NodeFailerTest { .filter(node -> node.hostname().equals(hostWithFailureReports)) .forEach(node -> { Node updatedNode = node.with(node.reports().withReport(badTotalMemorySizeReport)); - tester.nodeRepository.write(updatedNode); + tester.nodeRepository.write(updatedNode, () -> {}); }); testNodeFailingWith(tester, hostWithFailureReports); @@ -136,7 +136,7 @@ public class NodeFailerTest { .filter(node -> node.hostname().equals(hostWithFailureReports)) .forEach(node -> { Node updatedNode = node.with(node.reports().withReport(badTotalMemorySizeReport)); - tester.nodeRepository.write(updatedNode); + tester.nodeRepository.write(updatedNode, () -> {}); }); // The ready node will be failed, but neither the host nor the 2 active nodes since they have not been suspended @@ -215,8 +215,8 @@ public class NodeFailerTest { // Hardware failures are detected on two ready nodes, which are then failed Node readyFail1 = tester.nodeRepository.getNodes(NodeType.tenant, Node.State.ready).get(2); Node readyFail2 = tester.nodeRepository.getNodes(NodeType.tenant, Node.State.ready).get(3); - tester.nodeRepository.write(readyFail1.with(readyFail1.status().withHardwareFailureDescription(Optional.of("memory_mcelog")))); - tester.nodeRepository.write(readyFail2.with(readyFail2.status().withHardwareFailureDescription(Optional.of("disk_smart")))); + tester.nodeRepository.write(readyFail1.with(readyFail1.status().withHardwareFailureDescription(Optional.of("memory_mcelog"))), () -> {}); + tester.nodeRepository.write(readyFail2.with(readyFail2.status().withHardwareFailureDescription(Optional.of("disk_smart"))), () -> {}); assertEquals(4, tester.nodeRepository.getNodes(NodeType.tenant, Node.State.ready).size()); tester.failer.run(); assertEquals(2, tester.nodeRepository.getNodes(NodeType.tenant, Node.State.ready).size()); @@ -540,7 +540,7 @@ public class NodeFailerTest { assertEquals(Node.State.ready, readyNode.state()); tester.nodeRepository.write(readyNode.with(readyNode.status() - .withHardwareDivergence(Optional.of("{\"specVerificationReport\":{\"actualIpv6Connection\":false}}")))); + .withHardwareDivergence(Optional.of("{\"specVerificationReport\":{\"actualIpv6Connection\":false}}"))), () -> {}); tester.failer.run(); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRebooterTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRebooterTest.java index 2e0d43070b9..ce00e0f4033 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRebooterTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRebooterTest.java @@ -60,7 +60,7 @@ public class NodeRebooterTest { for (Node node : tester.nodeRepository.getNodes(Node.State.ready, Node.State.active)) { if (node.status().reboot().wanted() > node.status().reboot().current()) tester.nodeRepository.write(node.withCurrentRebootGeneration(node.status().reboot().wanted(), - tester.clock.instant())); + tester.clock.instant()), () -> {}); } } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirerTest.java index 407747785eb..93e44164f40 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/NodeRetirerTest.java @@ -78,7 +78,7 @@ public class NodeRetirerTest { public void testRetireAllocated() { // Update IP addresses on ready nodes so that when they are deployed to, we wont retire them tester.nodeRepository.getNodes(Node.State.ready) - .forEach(node -> tester.nodeRepository.write(node.with(node.ipConfig().with(Set.of("::2"))))); + .forEach(node -> tester.nodeRepository.write(node.with(node.ipConfig().with(Set.of("::2"))), () -> {})); tester.assertCountsForStateByFlavor(Node.State.active, 9, 4, 8, 11, -1); @@ -132,11 +132,11 @@ public class NodeRetirerTest { assertEquals(expected, actual); Node nodeWantToRetire = tester.nodeRepository.getNode("host3.test.yahoo.com").orElseThrow(RuntimeException::new); - tester.nodeRepository.write(nodeWantToRetire.with(nodeWantToRetire.status().withWantToRetire(true))); + tester.nodeRepository.write(nodeWantToRetire.with(nodeWantToRetire.status().withWantToRetire(true)), () -> {}); Node nodeToFail = tester.nodeRepository.getNode("host5.test.yahoo.com").orElseThrow(RuntimeException::new); tester.nodeRepository.fail(nodeToFail.hostname(), Agent.system, "Failed for unit testing"); Node nodeToUpdate = tester.nodeRepository.getNode("host8.test.yahoo.com").orElseThrow(RuntimeException::new); - tester.nodeRepository.write(nodeToUpdate.with(nodeToUpdate.ipConfig().with(Set.of("::2")))); + tester.nodeRepository.write(nodeToUpdate.with(nodeToUpdate.ipConfig().with(Set.of("::2"))), () -> {}); nodes = tester.nodeRepository.getNodes(app); Set<String> excluded = Stream.of(nodeWantToRetire, nodeToFail, nodeToUpdate).map(Node::hostname).collect(Collectors.toSet()); @@ -153,7 +153,7 @@ public class NodeRetirerTest { // Lets put 3 random nodes in wantToRetire List<Node> nodesToRetire = tester.nodeRepository.getNodes(app).stream().limit(3).collect(Collectors.toList()); - nodesToRetire.forEach(node -> tester.nodeRepository.write(node.with(node.status().withWantToRetire(true)))); + nodesToRetire.forEach(node -> tester.nodeRepository.write(node.with(node.status().withWantToRetire(true)), () -> {})); long actualOneWantToRetire = retirer.getNumberNodesAllowToRetireForCluster(tester.nodeRepository.getNodes(app), 2); assertEquals(0, actualOneWantToRetire); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java index abe28b107f7..43b2657342f 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/NodeTypeProvisioningTest.java @@ -109,7 +109,7 @@ public class NodeTypeProvisioningTest { Node nodeToRetire = tester.nodeRepository().getNodes(NodeType.proxy, Node.State.active).get(5); { // Pick out a node and retire it - tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true))); + tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true)), () -> {}); List<HostSpec> hosts = deployProxies(application, tester); assertEquals(11, hosts.size()); @@ -178,7 +178,7 @@ public class NodeTypeProvisioningTest { String currentyRetiringHostname; { nodesToRetire.forEach(nodeToRetire -> - tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true)))); + tester.nodeRepository().write(nodeToRetire.with(nodeToRetire.status().withWantToRetire(true)), () -> {})); List<HostSpec> hosts = deployProxies(application, tester); assertEquals(11, hosts.size()); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java index 2c237d94715..08fbe73a89a 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java @@ -156,7 +156,7 @@ public class ProvisioningTest { HostSpec host1 = state1.container0.iterator().next(); assertFalse(host1.version().isPresent()); Node node1 = tester.nodeRepository().getNode(host1.hostname()).get(); - tester.nodeRepository().write(node1.with(node1.status().withVespaVersion(Version.fromString("1.2.3")))); + tester.nodeRepository().write(node1.with(node1.status().withVespaVersion(Version.fromString("1.2.3"))), () -> {}); // redeploy SystemState state2 = prepare(application1, 1, 1, 1, 1, new NodeResources(1, 1, 1), tester); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java index 708ccf486f7..f13dd56b262 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java @@ -128,7 +128,7 @@ public class ProvisioningTester { public CapacityPolicies capacityPolicies() { return capacityPolicies; } public NodeList getNodes(ApplicationId id, Node.State ... inState) { return new NodeList(nodeRepository.getNodes(id, inState)); } - public void patchNode(Node node) { nodeRepository.write(node); } + public void patchNode(Node node) { nodeRepository.write(node, () -> {}); } public List<HostSpec> prepare(ApplicationId application, ClusterSpec cluster, int nodeCount, int groups, NodeResources flavor) { return prepare(application, cluster, nodeCount, groups, false, flavor); |