summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2023-03-07 11:32:49 +0100
committerHarald Musum <musum@yahooinc.com>2023-03-07 11:32:49 +0100
commitc80b55b534558ab4a01441774a0f60a8d87ab66f (patch)
tree6fff9fa7487f08f3a1b250bf46bbd69832233d70 /zkfacade
parentad3f5edcb4d86886a7f6c52c26449e683e1d3a67 (diff)
Wait longer for all users of barrier.
Use logic from RemoveApplicationWaiter in CuratorCompletionWaiter as well: Wait some time after quorum have reached barrier to see if another member reaches barrier as well. Return anyway after some time.
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java52
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java3
2 files changed, 38 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
index d5207c6fab8..8df37d1f6ce 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
@@ -11,9 +11,9 @@ import java.util.List;
import java.util.logging.Level;
/**
- * Implementation of a Barrier that handles the case where more than number of members can call synchronize. If
- * the number of members that synchronize exceed the expected number, the other members are immediately allowed
- * to pass through the barrier.
+ * Implementation of a Barrier that handles the case where more than number of members can call synchronize.
+ * Will wait for some time for all servers to do the operation, but will accept the majority of servers to have
+ * done the operation if it takes longer than a specified amount of time.
*
* @author Vegard Havdal
* @author Ulf Lilleengen
@@ -21,16 +21,20 @@ import java.util.logging.Level;
class CuratorCompletionWaiter implements Curator.CompletionWaiter {
private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(CuratorCompletionWaiter.class.getName());
+ private static final Duration waitForAllDefault = Duration.ofSeconds(1); // Make this configurable?
+
private final Curator curator;
private final String barrierPath;
private final String myId;
private final Clock clock;
+ private final Duration waitForAll;
- CuratorCompletionWaiter(Curator curator, String barrierPath, String myId, Clock clock) {
+ CuratorCompletionWaiter(Curator curator, String barrierPath, String myId, Clock clock, Duration waitForAll) {
this.myId = barrierPath + "/" + myId;
this.curator = curator;
this.barrierPath = barrierPath;
this.clock = clock;
+ this.waitForAll = waitForAll;
}
@Override
@@ -55,6 +59,7 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
private List<String> awaitInternal(Duration timeout) throws Exception {
Instant startTime = clock.instant();
Instant endTime = startTime.plus(timeout);
+ Instant gotQuorumTime = Instant.EPOCH;
List<String> respondents = new ArrayList<>();
do {
@@ -65,15 +70,20 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
respondents + ", all participants: " + curator.zooKeeperEnsembleConnectionSpec());
}
- // First, check if all config servers responded
+ // If all config servers responded, return
if (respondents.size() == curator.zooKeeperEnsembleCount()) {
- log.log(Level.FINE, () -> barrierCompletedMessage(respondents, startTime));
+ logBarrierCompleted(respondents, startTime);
break;
}
- // If some are missing, quorum is enough
+ // If some are missing, quorum is enough, but wait for all up to ´waitForAll´ seconds before returning
if (respondents.size() >= barrierMemberCount()) {
- log.log(Level.FINE, () -> barrierCompletedMessage(respondents, startTime));
- break;
+ if (gotQuorumTime.isBefore(startTime))
+ gotQuorumTime = clock.instant();
+
+ if (Duration.between(clock.instant(), gotQuorumTime.plus(waitForAll)).isNegative()) {
+ logBarrierCompleted(respondents, startTime);
+ break;
+ }
}
Thread.sleep(100);
@@ -82,9 +92,15 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
return respondents;
}
- private String barrierCompletedMessage(List<String> respondents, Instant startTime) {
- return barrierPath + " completed in " + Duration.between(startTime, Instant.now()).toString() +
- ", " + respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents;
+ private void logBarrierCompleted(List<String> respondents, Instant startTime) {
+ Duration duration = Duration.between(startTime, Instant.now());
+ Level level = duration.minus(Duration.ofSeconds(5)).isNegative() ? Level.FINE : Level.INFO;
+ log.log(level, () -> barrierCompletedMessage(respondents, duration));
+ }
+
+ private String barrierCompletedMessage(List<String> respondents, Duration duration) {
+ return barrierPath + " completed in " + duration.toString() +
+ ", " + respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents;
}
@Override
@@ -106,10 +122,18 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
}
public static Curator.CompletionWaiter create(Curator curator, Path barrierPath, String id) {
- return new CuratorCompletionWaiter(curator, barrierPath.getAbsolute(), id, Clock.systemUTC());
+ return create(curator, barrierPath, id, waitForAllDefault);
+ }
+
+ public static Curator.CompletionWaiter create(Curator curator, Path barrierPath, String id, Duration waitForAll) {
+ return new CuratorCompletionWaiter(curator, barrierPath.getAbsolute(), id, Clock.systemUTC(), waitForAll);
}
public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, String id) {
+ return createAndInitialize(curator, parentPath, waiterNode, id, waitForAllDefault);
+ }
+
+ public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, String id, Duration waitForAll) {
Path waiterPath = parentPath.append(waiterNode);
String debugMessage = log.isLoggable(Level.FINE) ? "Recreating ZK path " + waiterPath : null;
@@ -120,7 +144,7 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
if (debugMessage != null) log.fine(debugMessage + ": Done");
- return new CuratorCompletionWaiter(curator, waiterPath.getAbsolute(), id, Clock.systemUTC());
+ return new CuratorCompletionWaiter(curator, waiterPath.getAbsolute(), id, Clock.systemUTC(), waitForAll);
}
private int barrierMemberCount() {
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
index c8566015ea1..e5810763bf2 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
@@ -84,13 +84,10 @@ import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.BadVersionException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;