diff options
author | Harald Musum <musum@yahooinc.com> | 2023-03-07 11:32:49 +0100 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2023-03-07 11:32:49 +0100 |
commit | c80b55b534558ab4a01441774a0f60a8d87ab66f (patch) | |
tree | 6fff9fa7487f08f3a1b250bf46bbd69832233d70 /zkfacade | |
parent | ad3f5edcb4d86886a7f6c52c26449e683e1d3a67 (diff) |
Wait longer for all users of barrier.
Use logic from RemoveApplicationWaiter in CuratorCompletionWaiter as well:
Wait some time after quorum have reached barrier to see if another member
reaches barrier as well. Return anyway after some time.
Diffstat (limited to 'zkfacade')
-rw-r--r-- | zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java | 52 | ||||
-rw-r--r-- | zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java | 3 |
2 files changed, 38 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java index d5207c6fab8..8df37d1f6ce 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java @@ -11,9 +11,9 @@ import java.util.List; import java.util.logging.Level; /** - * Implementation of a Barrier that handles the case where more than number of members can call synchronize. If - * the number of members that synchronize exceed the expected number, the other members are immediately allowed - * to pass through the barrier. + * Implementation of a Barrier that handles the case where more than number of members can call synchronize. + * Will wait for some time for all servers to do the operation, but will accept the majority of servers to have + * done the operation if it takes longer than a specified amount of time. * * @author Vegard Havdal * @author Ulf Lilleengen @@ -21,16 +21,20 @@ import java.util.logging.Level; class CuratorCompletionWaiter implements Curator.CompletionWaiter { private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(CuratorCompletionWaiter.class.getName()); + private static final Duration waitForAllDefault = Duration.ofSeconds(1); // Make this configurable? + private final Curator curator; private final String barrierPath; private final String myId; private final Clock clock; + private final Duration waitForAll; - CuratorCompletionWaiter(Curator curator, String barrierPath, String myId, Clock clock) { + CuratorCompletionWaiter(Curator curator, String barrierPath, String myId, Clock clock, Duration waitForAll) { this.myId = barrierPath + "/" + myId; this.curator = curator; this.barrierPath = barrierPath; this.clock = clock; + this.waitForAll = waitForAll; } @Override @@ -55,6 +59,7 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { private List<String> awaitInternal(Duration timeout) throws Exception { Instant startTime = clock.instant(); Instant endTime = startTime.plus(timeout); + Instant gotQuorumTime = Instant.EPOCH; List<String> respondents = new ArrayList<>(); do { @@ -65,15 +70,20 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { respondents + ", all participants: " + curator.zooKeeperEnsembleConnectionSpec()); } - // First, check if all config servers responded + // If all config servers responded, return if (respondents.size() == curator.zooKeeperEnsembleCount()) { - log.log(Level.FINE, () -> barrierCompletedMessage(respondents, startTime)); + logBarrierCompleted(respondents, startTime); break; } - // If some are missing, quorum is enough + // If some are missing, quorum is enough, but wait for all up to ´waitForAll´ seconds before returning if (respondents.size() >= barrierMemberCount()) { - log.log(Level.FINE, () -> barrierCompletedMessage(respondents, startTime)); - break; + if (gotQuorumTime.isBefore(startTime)) + gotQuorumTime = clock.instant(); + + if (Duration.between(clock.instant(), gotQuorumTime.plus(waitForAll)).isNegative()) { + logBarrierCompleted(respondents, startTime); + break; + } } Thread.sleep(100); @@ -82,9 +92,15 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { return respondents; } - private String barrierCompletedMessage(List<String> respondents, Instant startTime) { - return barrierPath + " completed in " + Duration.between(startTime, Instant.now()).toString() + - ", " + respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents; + private void logBarrierCompleted(List<String> respondents, Instant startTime) { + Duration duration = Duration.between(startTime, Instant.now()); + Level level = duration.minus(Duration.ofSeconds(5)).isNegative() ? Level.FINE : Level.INFO; + log.log(level, () -> barrierCompletedMessage(respondents, duration)); + } + + private String barrierCompletedMessage(List<String> respondents, Duration duration) { + return barrierPath + " completed in " + duration.toString() + + ", " + respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents; } @Override @@ -106,10 +122,18 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { } public static Curator.CompletionWaiter create(Curator curator, Path barrierPath, String id) { - return new CuratorCompletionWaiter(curator, barrierPath.getAbsolute(), id, Clock.systemUTC()); + return create(curator, barrierPath, id, waitForAllDefault); + } + + public static Curator.CompletionWaiter create(Curator curator, Path barrierPath, String id, Duration waitForAll) { + return new CuratorCompletionWaiter(curator, barrierPath.getAbsolute(), id, Clock.systemUTC(), waitForAll); } public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, String id) { + return createAndInitialize(curator, parentPath, waiterNode, id, waitForAllDefault); + } + + public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, String id, Duration waitForAll) { Path waiterPath = parentPath.append(waiterNode); String debugMessage = log.isLoggable(Level.FINE) ? "Recreating ZK path " + waiterPath : null; @@ -120,7 +144,7 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { if (debugMessage != null) log.fine(debugMessage + ": Done"); - return new CuratorCompletionWaiter(curator, waiterPath.getAbsolute(), id, Clock.systemUTC()); + return new CuratorCompletionWaiter(curator, waiterPath.getAbsolute(), id, Clock.systemUTC(), waitForAll); } private int barrierMemberCount() { diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java index c8566015ea1..e5810763bf2 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java @@ -84,13 +84,10 @@ import org.apache.curator.retry.RetryForever; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.BadVersionException; -import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; - import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; |