diff options
author | HÃ¥kon Hallingstad <hakon.hallingstad@gmail.com> | 2023-09-18 11:25:09 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-18 11:25:09 +0200 |
commit | 3f5a467de3c80e260e91fa52b0249b350c06fd1b (patch) | |
tree | 9ff8d446e4a92a084bd7ab2e5974d8af499ff125 | |
parent | 100f2608bee853ea17ab543c3a8bfcfce4a3ac39 (diff) | |
parent | 50b7f332b3dc12d3ff909971c84fb9f82b075d22 (diff) |
Merge pull request #28553 from vespa-engine/hmusum/refactor-completion-waiter
Hmusum/refactor completion waiter
5 files changed, 23 insertions, 34 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionZooKeeperClient.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionZooKeeperClient.java index 85abd937ac0..2bc8cb5bc0a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionZooKeeperClient.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionZooKeeperClient.java @@ -118,9 +118,9 @@ public class SessionZooKeeperClient { public long sessionId() { return sessionId; } - public CompletionWaiter createActiveWaiter() { return createCompletionWaiter(ACTIVE_BARRIER); } + public CompletionWaiter createActiveWaiter() { return createCompletionWaiter(getWaiterPath(ACTIVE_BARRIER)); } - CompletionWaiter createPrepareWaiter() { return createCompletionWaiter(PREPARE_BARRIER); } + CompletionWaiter createPrepareWaiter() { return createCompletionWaiter(getWaiterPath(PREPARE_BARRIER)); } CompletionWaiter getPrepareWaiter() { return getCompletionWaiter(getWaiterPath(PREPARE_BARRIER)); } @@ -136,8 +136,8 @@ public class SessionZooKeeperClient { return sessionPath.append(barrierName); } - private CompletionWaiter createCompletionWaiter(String waiterNode) { - return curator.createCompletionWaiter(sessionPath, waiterNode, serverId, barrierWaitForAllTimeout); + private CompletionWaiter createCompletionWaiter(Path path) { + return curator.createCompletionWaiter(path, serverId, barrierWaitForAllTimeout); } private CompletionWaiter getCompletionWaiter(Path path) { diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 2781e81cd7c..80646dc5607 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -177,8 +177,8 @@ public class Curator extends AbstractComponent implements AutoCloseable { return CuratorCompletionWaiter.create(this, waiterPath, id, waitForAll); } - public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, String id, Duration waitForAll) { - return CuratorCompletionWaiter.createAndInitialize(this, parentPath, waiterNode, id, waitForAll); + public CompletionWaiter createCompletionWaiter(Path waiterPath, String id, Duration waitForAll) { + return CuratorCompletionWaiter.createAndInitialize(this, waiterPath, id, waitForAll); } /** Creates a listenable cache which keeps in sync with changes to all the immediate children of a path */ diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java index 5a3d6668231..7d918baaf54 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java @@ -6,10 +6,11 @@ import com.yahoo.path.Path; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.logging.Level; +import static com.yahoo.vespa.curator.Curator.CompletionWaiter; + /** * Implementation of a Barrier that handles the case where more than number of members can call synchronize. * Will wait for some time for all servers to do the operation, but will accept the majority of servers to have @@ -18,18 +19,18 @@ import java.util.logging.Level; * @author Vegard Havdal * @author Ulf Lilleengen */ -class CuratorCompletionWaiter implements Curator.CompletionWaiter { +class CuratorCompletionWaiter implements CompletionWaiter { private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(CuratorCompletionWaiter.class.getName()); private final Curator curator; - private final String barrierPath; - private final String myId; + private final Path barrierPath; + private final Path waiterNode; private final Clock clock; private final Duration waitForAll; - CuratorCompletionWaiter(Curator curator, String barrierPath, String myId, Clock clock, Duration waitForAll) { - this.myId = barrierPath + "/" + myId; + CuratorCompletionWaiter(Curator curator, Path barrierPath, String serverId, Clock clock, Duration waitForAll) { + this.waiterNode = barrierPath.append(serverId); this.curator = curator; this.barrierPath = barrierPath; this.clock = clock; @@ -60,10 +61,9 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { Instant endTime = startTime.plus(timeout); Instant gotQuorumTime = Instant.EPOCH; - List<String> respondents = new ArrayList<>(); + List<String> respondents; do { - respondents.clear(); - respondents.addAll(curator.framework().getChildren().forPath(barrierPath)); + respondents = curator.framework().getChildren().forPath(barrierPath.getAbsolute()); if (log.isLoggable(Level.FINER)) { log.log(Level.FINER, respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + respondents + ", all participants: " + curator.zooKeeperEnsembleConnectionSpec()); @@ -105,37 +105,26 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { @Override public void notifyCompletion() { try { - notifyInternal(); + curator.framework().create().forPath(waiterNode.getAbsolute()); } catch (Exception e) { throw new RuntimeException(e); } } - private void notifyInternal() throws Exception { - curator.framework().create().forPath(myId); - } - @Override public String toString() { return "'" + barrierPath + "', " + barrierMemberCount() + " members"; } - public static Curator.CompletionWaiter create(Curator curator, Path barrierPath, String id, Duration waitForAll) { - return new CuratorCompletionWaiter(curator, barrierPath.getAbsolute(), id, Clock.systemUTC(), waitForAll); + public static CompletionWaiter create(Curator curator, Path barrierPath, String id, Duration waitForAll) { + return new CuratorCompletionWaiter(curator, barrierPath, id, Clock.systemUTC(), waitForAll); } - public static Curator.CompletionWaiter createAndInitialize(Curator curator, Path parentPath, String waiterNode, String id, Duration waitForAll) { - Path waiterPath = parentPath.append(waiterNode); - - String debugMessage = log.isLoggable(Level.FINE) ? "Recreating ZK path " + waiterPath : null; - if (debugMessage != null) log.fine(debugMessage); - + public static CompletionWaiter createAndInitialize(Curator curator, Path waiterPath, String id, Duration waitForAll) { curator.delete(waiterPath); curator.createAtomically(waiterPath); - if (debugMessage != null) log.fine(debugMessage + ": Done"); - - return new CuratorCompletionWaiter(curator, waiterPath.getAbsolute(), id, Clock.systemUTC(), waitForAll); + return new CuratorCompletionWaiter(curator, waiterPath, id, Clock.systemUTC(), waitForAll); } private int barrierMemberCount() { diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java index e578746d348..592b9fc2a05 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java @@ -87,7 +87,7 @@ public class MockCurator extends Curator { } @Override - public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, String id, Duration waitForAll) { + public CompletionWaiter createCompletionWaiter(Path waiterPath, String id, Duration waitForAll) { return mockFramework().createCompletionWaiter(); } diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java index dbdb14b9214..17ec7fb65e9 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorCompletionWaiterTest.java @@ -19,7 +19,7 @@ public class CuratorCompletionWaiterTest { @Test public void testCompletionWaiter() throws InterruptedException { Curator curator = new MockCurator(); - Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", "foo", waitForAll); + Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot().append("foo"), "foo", waitForAll); Curator.CompletionWaiter notifier = CuratorCompletionWaiter.create(curator, Path.fromString("/foo"), "bar", waitForAll); Thread t1 = new Thread(() -> { try { @@ -36,7 +36,7 @@ public class CuratorCompletionWaiterTest { @Test(expected = CompletionTimeoutException.class) public void testCompletionWaiterFailure() { Curator curator = new MockCurator(); - Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot(), "foo", "foo", waitForAll); + Curator.CompletionWaiter waiter = CuratorCompletionWaiter.createAndInitialize(curator, Path.createRoot().append("foo"), "foo", waitForAll); waiter.awaitCompletion(Duration.ofMillis(100)); } } |