diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-04-29 13:55:13 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-04-29 13:55:13 +0200 |
commit | 9f492628f7fa9c5f59c50d97af37907581bcc015 (patch) | |
tree | 8fdbeea15b290577d04b4725c6e816d7f0420325 /zkfacade | |
parent | a8fc679943ae9bdc6d6d58d82a2b9f43dac54ad4 (diff) |
Wait longer for servers to reach barrier
1. Wait up to 2 seconds for all to reach barrier.
2. If not, wait up to 4 seconds for the server that waits for the barrier to
be one of the respondents AND a majority of servers have reached barrier.
3. If not, wait for a majority of servers to have reached barrier.
Diffstat (limited to 'zkfacade')
4 files changed, 52 insertions, 23 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 6d4f6beece1..37b1fa1c9fb 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -212,11 +212,11 @@ public class Curator implements AutoCloseable { } public CompletionWaiter getCompletionWaiter(Path waiterPath, int numMembers, String id) { - return CuratorCompletionWaiter.create(curatorFramework, waiterPath, numMembers, id); + return CuratorCompletionWaiter.create(this, waiterPath, id); } public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, int numMembers, String id) { - return CuratorCompletionWaiter.createAndInitialize(this, parentPath, waiterNode, numMembers, id); + return CuratorCompletionWaiter.createAndInitialize(this, parentPath, waiterNode, id); } /** Creates a listenable cache which keeps in sync with changes to all the immediate children of a path */ diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java index d71e23a0623..537ec2ae751 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java @@ -3,7 +3,6 @@ package com.yahoo.vespa.curator; import java.util.logging.Level; import com.yahoo.path.Path; -import org.apache.curator.framework.CuratorFramework; import java.time.Clock; import java.time.Duration; @@ -21,14 +20,12 @@ import java.util.List; class CuratorCompletionWaiter implements Curator.CompletionWaiter { private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(CuratorCompletionWaiter.class.getName()); - private final CuratorFramework curator; + private final Curator curator; private final String barrierPath; private final String myId; - private final int memberQty; private final Clock clock; - CuratorCompletionWaiter(int barrierMembers, CuratorFramework curator, String barrierPath, String myId, Clock clock) { - this.memberQty = barrierMembers; + CuratorCompletionWaiter(Curator curator, String barrierPath, String myId, Clock clock) { this.myId = barrierPath + "/" + myId; this.curator = curator; this.barrierPath = barrierPath; @@ -45,27 +42,51 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { } catch (Exception e) { throw new RuntimeException(e); } - if (respondents.size() < memberQty) { + if (respondents.size() < barrierMemberCount()) { throw new CompletionTimeoutException("Timed out waiting for peer config servers to complete operation. " + - "Got response from " + respondents + ", but need response from " + - "at least " + memberQty + " server(s). " + - "Timeout passed as argument was " + timeout.toMillis() + " ms"); + "Got response from " + respondents + ", but need response from " + + "at least " + barrierMemberCount() + " server(s). " + + "Timeout passed as argument was " + timeout.toMillis() + " ms"); } } private List<String> awaitInternal(Duration timeout) throws Exception { - Instant endTime = clock.instant().plus(timeout); + Instant startTime = clock.instant(); + Instant endTime = startTime.plus(timeout); List<String> respondents; do { - respondents = curator.getChildren().forPath(barrierPath); - if (respondents.size() >= memberQty) { + respondents = curator.framework().getChildren().forPath(barrierPath); + + // First, check if all config servers responded + if (respondents.size() == curator.zooKeeperEnsembleCount()) { + log.log(Level.FINE, barrierCompletedMessage(respondents, startTime)); + break; + } + // Then, if some are missing after 2 seconds, allow if the server this code is running on is one of the repondents + if (usedMoreTimeThan(Duration.ofSeconds(2), startTime) && respondents.contains(myId) && respondents.size() >= barrierMemberCount()) { + log.log(Level.INFO, barrierCompletedMessage(respondents, startTime)); break; } + // If some are still missing after 4 seconds, quorum is enough + if (usedMoreTimeThan(Duration.ofSeconds(4), startTime) && respondents.size() >= barrierMemberCount()) { + log.log(Level.INFO, barrierCompletedMessage(respondents, startTime)); + break; + } + Thread.sleep(100); } while (clock.instant().isBefore(endTime)); + return respondents; } + private boolean usedMoreTimeThan(Duration waitTime, Instant startTime) { + return clock.instant().isAfter(startTime.plus(waitTime)); + } + + private String barrierCompletedMessage(List<String> respondents, Instant startTime) { + return barrierPath + " completed in " + Duration.between(startTime, Instant.now()).toString() + + ", " + respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents; + } @Override public void notifyCompletion() { @@ -77,22 +98,27 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { } private void notifyInternal() throws Exception { - curator.create().forPath(myId); + curator.framework().create().forPath(myId); } @Override public String toString() { - return "'" + barrierPath + "', " + memberQty + " members"; + return "'" + barrierPath + "', " + barrierMemberCount() + " members"; } - public static Curator.CompletionWaiter create(CuratorFramework curator, Path barrierPath, int numMembers, String id) { - return new CuratorCompletionWaiter(numMembers, curator, barrierPath.getAbsolute(), id, Clock.systemUTC()); + public static Curator.CompletionWaiter create(Curator curator, Path barrierPath, String id) { + return new CuratorCompletionWaiter(curator, barrierPath.getAbsolute(), id, Clock.systemUTC()); } - public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, int numMembers, String id) { + public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, String id) { Path waiterPath = parentPath.append(waiterNode); curator.delete(waiterPath); curator.createAtomically(parentPath, waiterPath); - return new CuratorCompletionWaiter(numMembers, curator.framework(), waiterPath.getAbsolute(), id, Clock.systemUTC()); + return new CuratorCompletionWaiter(curator, waiterPath.getAbsolute(), id, Clock.systemUTC()); } + + private int barrierMemberCount() { + return (curator.zooKeeperEnsembleCount() / 2) + 1; // majority + } + } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java index f5848a07109..8e4a4530493 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java @@ -341,6 +341,9 @@ public class MockCurator extends Curator { return new MockFileCache(Path.fromString(path)); } + @Override + public int zooKeeperEnsembleCount() { return 1; } + /** * Invocation of changes to the file system state is abstracted through this to allow transactional * changes to notify on commit diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java index d1ce744de7f..54bba4aa619 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java @@ -17,8 +17,8 @@ public class CuratorCompletionWaiterTest { @Test public void testCompletionWaiter() throws InterruptedException { Curator curator = new MockCurator(); - Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", 1, "foo"); - Curator.CompletionWaiter notifier = CuratorCompletionWaiter.create(curator.framework(), Path.fromString("/foo"), 1, "bar"); + Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", "foo"); + Curator.CompletionWaiter notifier = CuratorCompletionWaiter.create(curator, Path.fromString("/foo"), "bar"); Thread t1 = new Thread(() -> { try { waiter.awaitCompletion(Duration.ofSeconds(120)); @@ -34,7 +34,7 @@ public class CuratorCompletionWaiterTest { @Test(expected = CompletionTimeoutException.class) public void testCompletionWaiterFailure() { Curator curator = new MockCurator(); - Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", 1, "foo"); + Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", "foo"); waiter.awaitCompletion(Duration.ofMillis(100)); } } |