summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-04-29 13:55:13 +0200
committerHarald Musum <musum@verizonmedia.com>2020-04-29 13:55:13 +0200
commit9f492628f7fa9c5f59c50d97af37907581bcc015 (patch)
tree8fdbeea15b290577d04b4725c6e816d7f0420325 /zkfacade
parenta8fc679943ae9bdc6d6d58d82a2b9f43dac54ad4 (diff)
Wait longer for servers to reach barrier
1. Wait up to 2 seconds for all to reach barrier. 2. If not, wait up to 4 seconds for the server that waits for the barrier to be one of the respondents AND a majority of servers have reached barrier. 3. If not, wait for a majority of servers to have reached barrier.
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java4
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java62
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java3
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java6
4 files changed, 52 insertions, 23 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 6d4f6beece1..37b1fa1c9fb 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -212,11 +212,11 @@ public class Curator implements AutoCloseable {
}
public CompletionWaiter getCompletionWaiter(Path waiterPath, int numMembers, String id) {
- return CuratorCompletionWaiter.create(curatorFramework, waiterPath, numMembers, id);
+ return CuratorCompletionWaiter.create(this, waiterPath, id);
}
public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, int numMembers, String id) {
- return CuratorCompletionWaiter.createAndInitialize(this, parentPath, waiterNode, numMembers, id);
+ return CuratorCompletionWaiter.createAndInitialize(this, parentPath, waiterNode, id);
}
/** Creates a listenable cache which keeps in sync with changes to all the immediate children of a path */
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
index d71e23a0623..537ec2ae751 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
@@ -3,7 +3,6 @@ package com.yahoo.vespa.curator;
import java.util.logging.Level;
import com.yahoo.path.Path;
-import org.apache.curator.framework.CuratorFramework;
import java.time.Clock;
import java.time.Duration;
@@ -21,14 +20,12 @@ import java.util.List;
class CuratorCompletionWaiter implements Curator.CompletionWaiter {
private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(CuratorCompletionWaiter.class.getName());
- private final CuratorFramework curator;
+ private final Curator curator;
private final String barrierPath;
private final String myId;
- private final int memberQty;
private final Clock clock;
- CuratorCompletionWaiter(int barrierMembers, CuratorFramework curator, String barrierPath, String myId, Clock clock) {
- this.memberQty = barrierMembers;
+ CuratorCompletionWaiter(Curator curator, String barrierPath, String myId, Clock clock) {
this.myId = barrierPath + "/" + myId;
this.curator = curator;
this.barrierPath = barrierPath;
@@ -45,27 +42,51 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
} catch (Exception e) {
throw new RuntimeException(e);
}
- if (respondents.size() < memberQty) {
+ if (respondents.size() < barrierMemberCount()) {
throw new CompletionTimeoutException("Timed out waiting for peer config servers to complete operation. " +
- "Got response from " + respondents + ", but need response from " +
- "at least " + memberQty + " server(s). " +
- "Timeout passed as argument was " + timeout.toMillis() + " ms");
+ "Got response from " + respondents + ", but need response from " +
+ "at least " + barrierMemberCount() + " server(s). " +
+ "Timeout passed as argument was " + timeout.toMillis() + " ms");
}
}
private List<String> awaitInternal(Duration timeout) throws Exception {
- Instant endTime = clock.instant().plus(timeout);
+ Instant startTime = clock.instant();
+ Instant endTime = startTime.plus(timeout);
List<String> respondents;
do {
- respondents = curator.getChildren().forPath(barrierPath);
- if (respondents.size() >= memberQty) {
+ respondents = curator.framework().getChildren().forPath(barrierPath);
+
+ // First, check if all config servers responded
+ if (respondents.size() == curator.zooKeeperEnsembleCount()) {
+ log.log(Level.FINE, barrierCompletedMessage(respondents, startTime));
+ break;
+ }
+ // Then, if some are missing after 2 seconds, allow if the server this code is running on is one of the repondents
+ if (usedMoreTimeThan(Duration.ofSeconds(2), startTime) && respondents.contains(myId) && respondents.size() >= barrierMemberCount()) {
+ log.log(Level.INFO, barrierCompletedMessage(respondents, startTime));
break;
}
+ // If some are still missing after 4 seconds, quorum is enough
+ if (usedMoreTimeThan(Duration.ofSeconds(4), startTime) && respondents.size() >= barrierMemberCount()) {
+ log.log(Level.INFO, barrierCompletedMessage(respondents, startTime));
+ break;
+ }
+
Thread.sleep(100);
} while (clock.instant().isBefore(endTime));
+
return respondents;
}
+ private boolean usedMoreTimeThan(Duration waitTime, Instant startTime) {
+ return clock.instant().isAfter(startTime.plus(waitTime));
+ }
+
+ private String barrierCompletedMessage(List<String> respondents, Instant startTime) {
+ return barrierPath + " completed in " + Duration.between(startTime, Instant.now()).toString() +
+ ", " + respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents;
+ }
@Override
public void notifyCompletion() {
@@ -77,22 +98,27 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
}
private void notifyInternal() throws Exception {
- curator.create().forPath(myId);
+ curator.framework().create().forPath(myId);
}
@Override
public String toString() {
- return "'" + barrierPath + "', " + memberQty + " members";
+ return "'" + barrierPath + "', " + barrierMemberCount() + " members";
}
- public static Curator.CompletionWaiter create(CuratorFramework curator, Path barrierPath, int numMembers, String id) {
- return new CuratorCompletionWaiter(numMembers, curator, barrierPath.getAbsolute(), id, Clock.systemUTC());
+ public static Curator.CompletionWaiter create(Curator curator, Path barrierPath, String id) {
+ return new CuratorCompletionWaiter(curator, barrierPath.getAbsolute(), id, Clock.systemUTC());
}
- public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, int numMembers, String id) {
+ public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, String id) {
Path waiterPath = parentPath.append(waiterNode);
curator.delete(waiterPath);
curator.createAtomically(parentPath, waiterPath);
- return new CuratorCompletionWaiter(numMembers, curator.framework(), waiterPath.getAbsolute(), id, Clock.systemUTC());
+ return new CuratorCompletionWaiter(curator, waiterPath.getAbsolute(), id, Clock.systemUTC());
}
+
+ private int barrierMemberCount() {
+ return (curator.zooKeeperEnsembleCount() / 2) + 1; // majority
+ }
+
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
index f5848a07109..8e4a4530493 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
@@ -341,6 +341,9 @@ public class MockCurator extends Curator {
return new MockFileCache(Path.fromString(path));
}
+ @Override
+ public int zooKeeperEnsembleCount() { return 1; }
+
/**
* Invocation of changes to the file system state is abstracted through this to allow transactional
* changes to notify on commit
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java
index d1ce744de7f..54bba4aa619 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java
@@ -17,8 +17,8 @@ public class CuratorCompletionWaiterTest {
@Test
public void testCompletionWaiter() throws InterruptedException {
Curator curator = new MockCurator();
- Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", 1, "foo");
- Curator.CompletionWaiter notifier = CuratorCompletionWaiter.create(curator.framework(), Path.fromString("/foo"), 1, "bar");
+ Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", "foo");
+ Curator.CompletionWaiter notifier = CuratorCompletionWaiter.create(curator, Path.fromString("/foo"), "bar");
Thread t1 = new Thread(() -> {
try {
waiter.awaitCompletion(Duration.ofSeconds(120));
@@ -34,7 +34,7 @@ public class CuratorCompletionWaiterTest {
@Test(expected = CompletionTimeoutException.class)
public void testCompletionWaiterFailure() {
Curator curator = new MockCurator();
- Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", 1, "foo");
+ Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", "foo");
waiter.awaitCompletion(Duration.ofMillis(100));
}
}