summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2022-10-07 10:48:51 +0200
committerjonmv <venstad@gmail.com>2022-10-10 12:33:57 +0200
commit5737387f8c0dd414df16933eb75b944c25d1bb8d (patch)
tree2601402afb0aed00e579fd09a935a4406d7b8208 /zkfacade
parent990a561813f0ff23e13c4571d00c2315c866497f (diff)
Expand ZK API, and implement in CuratorWrapper
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/abi-spec.json78
-rw-r--r--zkfacade/pom.xml6
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java2
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java79
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java331
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java94
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java91
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java245
8 files changed, 923 insertions, 3 deletions
diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json
index d227f8490dc..6fc96e32b18 100644
--- a/zkfacade/abi-spec.json
+++ b/zkfacade/abi-spec.json
@@ -1,4 +1,70 @@
{
+ "com.yahoo.vespa.curator.api.AbstractSingletonWorker": {
+ "superClass": "com.yahoo.component.AbstractComponent",
+ "interfaces": [
+ "com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker"
+ ],
+ "attributes": [
+ "public",
+ "abstract"
+ ],
+ "methods": [
+ "public void <init>()",
+ "protected java.lang.String id()",
+ "protected final void register(com.yahoo.vespa.curator.api.VespaCurator, java.time.Duration)",
+ "protected final void unregister(java.time.Duration)"
+ ],
+ "fields": []
+ },
+ "com.yahoo.vespa.curator.api.VespaCurator$Data": {
+ "superClass": "java.lang.Record",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "record"
+ ],
+ "methods": [
+ "public void <init>(com.yahoo.vespa.curator.api.VespaCurator$Meta, byte[])",
+ "public final java.lang.String toString()",
+ "public final int hashCode()",
+ "public final boolean equals(java.lang.Object)",
+ "public com.yahoo.vespa.curator.api.VespaCurator$Meta meta()",
+ "public byte[] data()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.vespa.curator.api.VespaCurator$Meta": {
+ "superClass": "java.lang.Record",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "record"
+ ],
+ "methods": [
+ "public void <init>(int)",
+ "public final java.lang.String toString()",
+ "public final int hashCode()",
+ "public final boolean equals(java.lang.Object)",
+ "public int version()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract void activate()",
+ "public abstract void deactivate()"
+ ],
+ "fields": []
+ },
"com.yahoo.vespa.curator.api.VespaCurator": {
"superClass": "java.lang.Object",
"interfaces": [],
@@ -8,7 +74,17 @@
"abstract"
],
"methods": [
- "public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)"
+ "public abstract java.util.Optional stat(com.yahoo.path.Path)",
+ "public abstract java.util.Optional read(com.yahoo.path.Path)",
+ "public abstract com.yahoo.vespa.curator.api.VespaCurator$Meta write(com.yahoo.path.Path, byte[])",
+ "public abstract java.util.Optional write(com.yahoo.path.Path, byte[], int)",
+ "public abstract void delete(com.yahoo.path.Path)",
+ "public abstract boolean delete(com.yahoo.path.Path, int)",
+ "public abstract java.util.List list(com.yahoo.path.Path)",
+ "public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)",
+ "public abstract java.util.concurrent.Future registerSingleton(java.lang.String, com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
+ "public abstract java.util.concurrent.Future unregisterSingleton(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
+ "public abstract boolean isActive(java.lang.String)"
],
"fields": []
}
diff --git a/zkfacade/pom.xml b/zkfacade/pom.xml
index 17be17d8302..41053585185 100644
--- a/zkfacade/pom.xml
+++ b/zkfacade/pom.xml
@@ -13,6 +13,12 @@
<version>8-SNAPSHOT</version>
<dependencies>
<dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>testutil</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 134450edd9a..6c0084a698e 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -64,7 +64,7 @@ public class Curator extends AbstractComponent implements AutoCloseable {
private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg"));
// Note that session timeout has min and max values are related to tickTime defined by server, see configserver.def
- private static final Duration ZK_SESSION_TIMEOUT = Duration.ofSeconds(120);
+ static final Duration ZK_SESSION_TIMEOUT = Duration.ofSeconds(120);
private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1);
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
index 64c1a023a91..66380cfd5f4 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
@@ -25,10 +25,69 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator {
static final Path userRoot = Path.fromString("user");
private final Curator curator;
+ private final SingletonManager singletons;
@Inject
public CuratorWrapper(Curator curator) {
+ this(curator, Clock.systemUTC(), Duration.ofSeconds(1));
+ }
+
+ CuratorWrapper(Curator curator, Clock clock, Duration tickTimeout) {
this.curator = curator;
+ this.singletons = new SingletonManager(curator, clock, tickTimeout);
+
+ curator.framework().getConnectionStateListenable().addListener((curatorFramework, connectionState) -> {
+ if (connectionState == ConnectionState.LOST) singletons.invalidate();
+ });
+ }
+
+ @Override
+ public Optional<Meta> stat(Path path) {
+ return curator.getStat(userRoot.append(path)).map(stat -> new Meta(stat.getVersion()));
+ }
+
+ @Override
+ public Optional<Data> read(Path path) {
+ Stat stat = new Stat();
+ return curator.getData(userRoot.append(path), stat).map(data -> new Data(new Meta(stat.getVersion()), data));
+ }
+
+ @Override
+ public Meta write(Path path, byte[] data) {
+ return new Meta(curator.set(userRoot.append(path), data).getVersion());
+ }
+
+ @Override
+ public Optional<Meta> write(Path path, byte[] data, int expectedVersion) {
+ try {
+ return Optional.of(new Meta(curator.set(userRoot.append(path), data, expectedVersion).getVersion()));
+ }
+ catch (RuntimeException e) {
+ if (e.getCause() instanceof BadVersionException) return Optional.empty();
+ throw e;
+ }
+ }
+
+ @Override
+ public void delete(Path path) {
+ curator.delete(userRoot.append(path));
+ }
+
+ @Override
+ public boolean delete(Path path, int expectedVersion) {
+ try {
+ curator.delete(userRoot.append(path), expectedVersion);
+ return true;
+ }
+ catch (RuntimeException e) {
+ if (e.getCause() instanceof BadVersionException) return false;
+ throw e;
+ }
+ }
+
+ @Override
+ public List<String> list(Path path) {
+ return curator.getChildren(userRoot.append(path));
}
@Override
@@ -40,4 +99,24 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator {
return () -> { try(old) { current.close(); } };
}
+ @Override
+ public CompletableFuture<?> registerSingleton(String singletonId, SingletonWorker singleton) {
+ return singletons.register(singletonId, singleton);
+ }
+
+ @Override
+ public CompletableFuture<?> unregisterSingleton(SingletonWorker singleton) {
+ return singletons.unregister(singleton);
+ }
+
+ @Override
+ public boolean isActive(String singletonId) {
+ return singletons.isActive(singletonId);
+ }
+
+ @Override
+ public void deconstruct() {
+ singletons.close();
+ }
+
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
new file mode 100644
index 00000000000..72f0fecb695
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
@@ -0,0 +1,331 @@
+package com.yahoo.vespa.curator;
+
+import com.yahoo.path.Path;
+import com.yahoo.protect.Process;
+import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages {@link com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker}.
+ *
+ * @author jonmv
+ */
+class SingletonManager implements AutoCloseable {
+
+ private static final Logger logger = Logger.getLogger(SingletonManager.class.getName());
+
+ private final Curator curator;
+ private final Clock clock;
+ private final Duration tickTimeout;
+ private final Map<String, Janitor> janitors = new HashMap<>();
+ private final Map<String, Integer> count = new HashMap<>();
+ private final Map<SingletonWorker, String> registrations = new HashMap<>();
+
+ SingletonManager(Curator curator, Clock clock, Duration tickTimeout) {
+ this.curator = curator;
+ this.clock = clock;
+ this.tickTimeout = tickTimeout;
+ }
+
+ synchronized CompletableFuture<?> register(String singletonId, SingletonWorker singleton) {
+ if (singletonId.contains("/") || singletonId.contains("..")) {
+ throw new IllegalArgumentException("singleton ID may not contain '/' or '..', but got " + singletonId);
+ }
+ String old = registrations.putIfAbsent(singleton, singletonId);
+ if (old != null) throw new IllegalArgumentException(singleton + " already registered with ID " + old);
+ count.merge(singletonId, 1, Integer::sum);
+ return janitors.computeIfAbsent(singletonId, Janitor::new).register(singleton);
+ }
+
+ synchronized CompletableFuture<?> unregister(SingletonWorker singleton) {
+ String id = registrations.remove(singleton);
+ if (id == null) throw new IllegalArgumentException(singleton + " is not registered");
+ return janitors.get(id).unregister(singleton)
+ .whenComplete((__, ___) -> unregistered(id, singleton));
+ }
+
+ synchronized void unregistered(String singletonId, SingletonWorker singleton) {
+ registrations.remove(singleton);
+ if (count.merge(singletonId, -1, Integer::sum) > 0) return;
+ count.remove(singletonId);
+ janitors.remove(singletonId).shutdown();
+ }
+
+ /**
+ * The instant until which this container holds the exclusive lease for activation of singletons with this ID.
+ * The container may abandon the lease early, if deactivation is triggered and completes before the deadline.
+ * Unless connection to the underlying ZK cluster is lost, the returned value will regularly move forwards in time.
+ */
+ Optional<Instant> activeUntil(String singletonId) {
+ return Optional.ofNullable(janitors.get(singletonId)).map(janitor -> janitor.doom.get());
+ }
+
+ /** Whether this container currently holds the activation lease for the given singleton ID. */
+ boolean isActive(String singletonId) {
+ return activeUntil(singletonId).map(clock.instant()::isBefore).orElse(false);
+ }
+
+ /** Invalidate all leases, due to connection loss. */
+ synchronized void invalidate() {
+ for (Janitor janitor : janitors.values()) janitor.invalidate();
+ }
+
+ @Override
+ public synchronized void close() {
+ List<SingletonWorker> registered = List.copyOf(registrations.keySet());
+ for (SingletonWorker singleton : registered) {
+ String id = registrations.get(singleton);
+ logger.log(Level.WARNING, singleton + " still registered with id '" + id + "' at shutdown");
+ unregister(singleton);
+ }
+ }
+
+
+ private class Janitor {
+
+ static class Task {
+
+ enum Type { register, unregister }
+
+ final Type type;
+ final SingletonWorker singleton;
+ final CompletableFuture<?> future = new CompletableFuture<>();
+
+ private Task(Type type, SingletonWorker singleton) {
+ this.type = type;
+ this.singleton = singleton;
+ }
+
+ static Task register(SingletonWorker singleton) { return new Task(Type.register, singleton); }
+ static Task unregister(SingletonWorker singleton) { return new Task(Type.unregister, singleton); }
+
+ }
+
+ private static final Instant INVALID = Instant.ofEpochMilli(-1);
+
+ final BlockingDeque<Task> tasks = new LinkedBlockingDeque<>();
+ final Deque<SingletonWorker> singletons = new ArrayDeque<>(2);
+ final AtomicReference<Instant> doom = new AtomicReference<>();
+ final AtomicBoolean shutdown = new AtomicBoolean();
+ final Thread worker;
+ final String id;
+ final Path path;
+ Lock lock = null;
+ boolean active;
+
+
+ Janitor(String id) {
+ this.id = id;
+ this.path = Path.fromString("/vespa/singleton/v1/" + id);
+ this.worker = new Thread(this::run, "singleton-janitor-" + id + "-");
+
+ worker.setDaemon(true);
+ worker.start();
+ }
+
+ public void unlock() {
+ doom.set(null);
+ if (lock != null) try {
+ lock.close();
+ lock = null;
+ }
+ catch (Exception e) {
+ logger.log(Level.WARNING, "failed closing " + lock, e);
+ }
+ }
+
+ @SuppressWarnings("fallthrough")
+ private void run() {
+ try {
+ while ( ! shutdown.get()) {
+ try {
+ // Attempt to acquire the lock, and extend our doom.
+ renewLease();
+ // Ensure activation status is set accordingly to doom, or clear state if this fails.
+ updateStatus();
+ // Process the next pending, externally triggered task, if any.
+ doTask();
+ }
+ catch (InterruptedException e) {
+ if ( ! shutdown.get()) {
+ logger.log(Level.WARNING, worker + " interrupted, restarting event loop");
+ }
+ }
+ }
+ for (Task task : tasks) task.future.cancel(true);
+ unlock();
+ }
+ catch (Throwable t) {
+ Process.logAndDie(worker + " can't continue, shutting down", t);
+ }
+ }
+
+ protected void doTask() throws InterruptedException {
+ Task task = tasks.poll(tickTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ try {
+ if (task != null) switch (task.type) {
+ case register -> {
+ doRegister(task.singleton);
+ task.future.complete(null);
+ }
+ case unregister -> {
+ doUnregister(task.singleton);
+ task.future.complete(null);
+ }
+ default -> throw new AssertionError("unknown task type '" + task.type + "'");
+ }
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Uncaught exception in " + worker, e);
+ task.future.completeExceptionally(e);
+ }
+ }
+
+ private void doRegister(SingletonWorker singleton) {
+ SingletonWorker current = singletons.peek();
+ singletons.push(singleton);
+ if (active) {
+ RuntimeException e = null;
+ if (current != null) try {
+ current.deactivate();
+ }
+ catch (RuntimeException f) {
+ e = f;
+ }
+ try {
+ singleton.activate();
+ }
+ catch (RuntimeException f) {
+ if (e == null) e = f;
+ else e.addSuppressed(f);
+ }
+ if (e != null) throw e;
+ }
+ }
+
+ private void doUnregister(SingletonWorker singleton) {
+ RuntimeException e = null;
+ SingletonWorker current = singletons.peek();
+ if ( ! singletons.remove(singleton)) return;
+ if (active && current == singleton) {
+ try {
+ singleton.deactivate();
+ }
+ catch (RuntimeException f) {
+ e = f;
+ }
+ if ( ! singletons.isEmpty()) try {
+ singletons.peek().activate();
+ }
+ catch (RuntimeException f) {
+ if (e == null) e = f;
+ else e.addSuppressed(f);
+ }
+ }
+ if (singletons.isEmpty()) {
+ unlock();
+ }
+ if (e != null) throw e;
+ }
+
+ /**
+ * Attempt to acquire the lock, if not held.
+ * If lock is held, or acquired, ping the ZK cluster to extend our deadline.
+ */
+ private void renewLease() {
+ if (doom.get() == INVALID) {
+ unlock();
+ }
+ // Witness value to detect if invalidation occurs between here and successful ping.
+ Instant ourDoom = doom.get();
+ Instant start = clock.instant();
+ if (lock == null) try {
+ lock = curator.lock(path.append("lock"), tickTimeout);
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.FINE, "Failed acquiring lock for '" + path + "' within " + tickTimeout, e);
+ return;
+ }
+ try {
+ curator.set(path.append("ping"), new byte[0]);
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.FINE, "Failed pinging ZK cluster", e);
+ return;
+ }
+ if ( ! doom.compareAndSet(ourDoom, start.plus(Curator.ZK_SESSION_TIMEOUT.multipliedBy(9).dividedBy(10)))) {
+ logger.log(Level.FINE, "Deadline changed, invalidating current lease renewal");
+ }
+ }
+
+ /**
+ * Attempt to activate or deactivate if status has changed.
+ * If activation fails, we release the lock, to a different container may acquire it.
+ */
+ private void updateStatus() {
+ Instant ourDoom = doom.get();
+ boolean shouldBeActive = ourDoom != null && ! clock.instant().isAfter(ourDoom);
+ if ( ! active && shouldBeActive) {
+ try {
+ active = true;
+ if ( ! singletons.isEmpty()) singletons.peek().activate();
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Failed to activate " + singletons.peek() + ", deactivating again", e);
+ shouldBeActive = false;
+ }
+ }
+ if (active && ! shouldBeActive) {
+ try {
+ active = false;
+ if ( ! singletons.isEmpty()) singletons.peek().deactivate();
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Failed to deactivate " + singletons.peek(), e);
+ }
+ unlock();
+ }
+ }
+
+ CompletableFuture<?> register(SingletonWorker singleton) {
+ Task task = Task.register(singleton);
+ tasks.offer(task);
+ return task.future;
+ }
+
+ CompletableFuture<?> unregister(SingletonWorker singleton) {
+ Task task = Task.unregister(singleton);
+ tasks.offer(task);
+ return task.future;
+ }
+
+ void invalidate() {
+ doom.set(INVALID);
+ }
+
+ void shutdown() {
+ if ( ! shutdown.compareAndSet(false, true)) {
+ logger.log(Level.WARNING, "Shutdown called more than once on " + this);
+ }
+ }
+
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java
new file mode 100644
index 00000000000..be0b01e732b
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java
@@ -0,0 +1,94 @@
+package com.yahoo.vespa.curator.api;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.UncheckedTimeoutException;
+import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker;
+import com.yahoo.yolean.UncheckedInterruptedException;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Super-class for implementations of {@link SingletonWorker}.
+ * Users should call {@link VespaCurator#registerSingleton} at construction, and
+ * {@link VespaCurator#unregisterSingleton} at deconstruction.
+ * If this fails activation on registration, it will immediately unregister as well, before propagating the error.
+ * Consequently, registering this on construction will allow detecting a failed component generation, and instead
+ * retain the previous generation, provided enough verification is done in {@link #activate()}.
+ * The default ID to use with registration is the concrete class name, e.g., {@code my.example.Singleton}.
+ *
+ * @author jonmv
+ */
+public abstract class AbstractSingletonWorker extends AbstractComponent implements SingletonWorker {
+
+ private final AtomicReference<VespaCurator> owner = new AtomicReference<>();
+
+ /**
+ * The singleton ID to use when registering this with a {@link VespaCurator}.
+ * At most one singleton worker with the given ID will be active, in the cluster, at any time.
+ * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently
+ * allowed to have an active singleton with the given ID.
+ */
+ protected String id() { return getClass().getName(); }
+
+ /**
+ * Call this at the end of construction of the child or owner.
+ * If this activates the singleton, this happens synchronously, and any errors are propagated here.
+ * If this replaces an already active singleton, its deactivation is also called, prior to activation of this.
+ * If (de)activation is not complete within the given timeout, a timeout exception is thrown.
+ * If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout.
+ */
+ protected final void register(VespaCurator curator, Duration timeout) {
+ if ( ! owner.compareAndSet(null, curator)) {
+ throw new IllegalArgumentException(this + " is already registered with " + owner.get());
+ }
+ try {
+ await(curator.registerSingleton(id(), this), timeout, "register");
+ }
+ catch (RuntimeException e) {
+ try {
+ unregister(timeout);
+ }
+ catch (Exception f) {
+ e.addSuppressed(f);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Call this at the start of deconstruction of the child!
+ * If this singleton is active, deactivation will be called synchronously, and errors propagated here.
+ * If this also triggers activation of a new singleton, its activation is called after deactivation of this.
+ * If (de)activation is not complete within the given timeout, a timeout exception is thrown.
+ */
+ protected final void unregister(Duration timeout) {
+ VespaCurator curator = owner.getAndSet(null);
+ if (curator == null) {
+ throw new IllegalArgumentException(this + " was not registered with any owners");
+ }
+ await(curator.unregisterSingleton(this), timeout, "unregister");
+ }
+
+ private void await(Future<?> future, Duration timeout, String verb) {
+ try {
+ future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ future.cancel(true);
+ throw new UncheckedInterruptedException("interrupted while " + verb + "ing " + this, e, true);
+ }
+ catch (TimeoutException e) {
+ future.cancel(true);
+ throw new UncheckedTimeoutException("timed out while " + verb + "ing " + this, e);
+ }
+ catch (ExecutionException e) {
+ throw new RuntimeException("failed to " + verb + " " + this, e.getCause());
+ }
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
index 32aef2da118..19b86f1c191 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
@@ -5,16 +5,105 @@ import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.path.Path;
import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
/**
* A client for a ZooKeeper cluster running inside Vespa. Applications that want to use ZooKeeper can inject this in
* their code.
*
* @author mpolden
+ * @author jonmv
*/
public interface VespaCurator {
- /** Create and acquire a re-entrant lock in given path. This blocks until the lock is acquired or timeout elapses. */
+ /** Returns the stat for the node at the given path, or empty if no node exists at that path. */
+ Optional<Meta> stat(Path path);
+
+ /** Returns the content and stat for the node at the given path, or empty if no node exists at that path. */
+ Optional<Data> read(Path path);
+
+ /** Writes the given data to a node at the given path, creating it and its parents as needed, and returns the stat of the modified node. */
+ Meta write(Path path, byte[] data);
+
+ /**
+ * Atomically compares the version in the stat of the node at the given path, with the expected version, and then:
+ * if they are equal, performs the write operation (see {@link #write(Path, byte[])});
+ * otherwise, return empty.
+ */
+ Optional<Meta> write(Path path, byte[] data, int expectedVersion);
+
+ /** Recursively deletes any node at the given path, and any children it may have. */
+ void delete(Path path);
+
+ /**
+ * Atomically compares the version in the stat of the node at the given path, with the expected version, and then:
+ * if they are equal, performs the recursive delete operation (see {@link #delete(Path)}), and returns {@code} true;
+ * otherwise, returns {@code false}.
+ */
+ boolean delete(Path path, int expectedVersion);
+
+ List<String> list(Path path);
+
+ /** Creates and acquires a re-entrant lock with the given path. This blocks until the lock is acquired or timeout elapses. */
AutoCloseable lock(Path path, Duration timeout) throws UncheckedTimeoutException;
+ /** Data of a ZK node, including content (possibly empty, never {@code null}) and metadata. */
+ record Data(Meta meta, byte[] data) { }
+
+ /** Metadata for a ZK node. */
+ record Meta(int version) { }
+
+ /**
+ * Register the singleton with the framework, so it may become active, and returns a
+ * synchronisation handle to any deactivations or activations triggered by this.
+ * If there is already another active singleton with the given ID (in this JVM),
+ * that will be deactivated before the new one is activated.
+ */
+ Future<?> registerSingleton(String singletonId, SingletonWorker singleton);
+
+ /**
+ * Unregister with the framework, so this singleton may no longer be active, and returns
+ * a synchronisation handle to any deactivation or activation triggered by this.
+ * If this is the last singleton registered with its ID, then this container immediately releases
+ * the activation lease for that ID, so another container may acquire it.
+ */
+ Future<?> unregisterSingleton(SingletonWorker singleton);
+
+ /**
+ * Whether this container currently holds te exclusive lease for activation of singletons with this ID.
+ */
+ boolean isActive(String singletonId);
+
+ /**
+ * Callback interface for processes of which only a single instance should be active at any time, across all
+ * containers in the cluster, and across all component generations. Notes:
+ * <ul>
+ * <li>{@link #activate()} is called by the system on a singleton whenever it is the newest registered
+ * singleton in this container, and this container has the lease for the ID with which the singleton
+ * was registered. See {@link #registerSingleton} and {@link #isActive}.</li>
+ * <li>{@link #deactivate()} is called by the system on a singleton which is currently active whenever
+ * the above no longer holds. See {@link #unregisterSingleton}.</li>
+ * <li>Callbacks for the same ID are always invoked by the same thread, in serial,
+ * which means the implemented callbacks must return in a timely manner.</li>
+ * <li>If activation of a singleton, as a result of the container acquiring the lease at some tpoint,</li>
+ * </ul>
+ * See {@link AbstractSingletonWorker} for an abstract superclass to use for implementations.
+ */
+ interface SingletonWorker {
+
+ /**
+ * Called by the system whenever this singleton becomes the single active worker.
+ * If this is triggered because the container obtains the activation lease, and activation fails,
+ * then the container immediately releases the lease, so another container may acquire it instead.
+ */
+ void activate();
+
+ /** Called by the system whenever this singleton is no longer the single active worker. */
+ void deactivate();
+
+ }
+
}
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
new file mode 100644
index 00000000000..645d0ae5a37
--- /dev/null
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
@@ -0,0 +1,245 @@
+package com.yahoo.vespa.curator;
+
+import com.yahoo.path.Path;
+import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.curator.api.AbstractSingletonWorker;
+import com.yahoo.vespa.curator.api.VespaCurator;
+import com.yahoo.vespa.curator.api.VespaCurator.Meta;
+import com.yahoo.vespa.curator.mock.MockCurator;
+import com.yahoo.vespa.curator.mock.MockCuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.junit.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author jonmv
+ */
+public class CuratorWrapperTest {
+
+ static final Path lockPath = Path.fromString("/vespa/singleton/v1/singleton/lock");
+
+ @Test
+ public void testUserApi() throws Exception {
+ try (Curator wrapped = new MockCurator()) {
+ CuratorWrapper curator = new CuratorWrapper(wrapped);
+
+ Path path = Path.fromString("path");
+ assertEquals(Optional.empty(), curator.stat(path));
+
+ Meta meta = curator.write(path, "data".getBytes(UTF_8));
+ assertEquals(Optional.of(meta), curator.stat(path));
+
+ assertEquals("data", new String(curator.read(path).get().data(), UTF_8));
+ assertEquals(meta, curator.read(path).get().meta());
+
+ assertEquals(Optional.empty(), curator.write(path, new byte[0], 0));
+
+ meta = curator.write(path, new byte[0], meta.version()).get();
+ assertEquals(3, meta.version());
+
+ assertEquals(List.of("path"), curator.list(Path.createRoot()));
+
+ assertFalse(curator.delete(path, 0));
+
+ curator.delete(path, 3);
+
+ assertEquals(List.of(), curator.list(Path.createRoot()));
+
+ try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) {
+ assertEquals(List.of("user", "path"), wrapped.getChildren(Path.createRoot()));
+ assertEquals(List.of("path"), wrapped.getChildren(CuratorWrapper.userRoot));
+ }
+
+ try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) {
+ // Both previous locks were released.
+ }
+ }
+ }
+
+ @Test
+ public void testSingleSingleton() {
+ try (Curator wrapped = new MockCurator()) {
+ Phaser stunning = new Phaser(1);
+ ManualClock clock = new ManualClock() {
+ @Override public Instant instant() {
+ stunning.arriveAndAwaitAdvance();
+ // Let test thread advance time when desired.
+ stunning.arriveAndAwaitAdvance();
+ return super.instant();
+ };
+ };
+ CuratorWrapper curator = new CuratorWrapper(wrapped, clock, Duration.ofMillis(100));
+
+ // First singleton to register becomes active during construction.
+ Singleton singleton = new Singleton(curator);
+ assertTrue(singleton.isActive);
+ assertTrue(wrapped.exists(lockPath));
+ assertTrue(curator.isActive(singleton.id()));
+ singleton.deconstruct();
+ assertFalse(singleton.isActive);
+ // ... and deactivated as a result of unregistering again.
+
+ // Singleton can be set up again, but this time, time runs away.
+ singleton = new Singleton(curator) {
+ @Override public void activate() {
+ // Set up sync in clock on next renewLease.
+ super.activate();
+ stunning.register();
+ }
+ };
+ assertTrue(singleton.isActive);
+
+ stunning.arriveAndAwaitAdvance(); // Wait for next renewLease.
+ stunning.arriveAndAwaitAdvance(); // Let next renewLease complete.
+ stunning.arriveAndAwaitAdvance(); // Wait for next updateStatus.
+ clock.advance(Curator.ZK_SESSION_TIMEOUT);
+ singleton.phaser.register(); // Set up so we can synchronise with deactivation.
+ stunning.arriveAndDeregister(); // Let lease expire, and ensure further ticks complete if we lose the race to unregister.
+
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertFalse(singleton.isActive);
+
+ // Singleton is reactivated next tick.
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertTrue(singleton.isActive);
+
+ // Manager unregisters remaining singletons on shutdown.
+ curator.deconstruct();
+ singleton.phaser.arriveAndAwaitAdvance();
+ }
+ }
+
+ @Test
+ public void testSingletonsInSameContainer() {
+ try (Curator wrapped = new MockCurator()) {
+ CuratorWrapper curator = new CuratorWrapper(wrapped);
+
+ // First singleton to register becomes active during construction.
+ Singleton singleton = new Singleton(curator) { public String toString() { return "S 1"; }};
+ assertTrue(singleton.isActive);
+ assertTrue(wrapped.exists(lockPath));
+ assertTrue(curator.isActive(singleton.id()));
+
+ Singleton newSingleton = new Singleton(curator);
+ assertTrue(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ Singleton newerSingleton = new Singleton(curator);
+ assertTrue(newerSingleton.isActive);
+ assertFalse(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ singleton.deconstruct();
+ assertTrue(newerSingleton.isActive);
+ assertFalse(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ newerSingleton.deconstruct();
+ assertFalse(newerSingleton.isActive);
+ assertTrue(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ // Add a singleton which fails activation.
+ Phaser stunning = new Phaser(2);
+ AtomicReference<String> thrownMessage = new AtomicReference<>();
+ new Thread(() -> {
+ RuntimeException e = assertThrows(RuntimeException.class,
+ () -> new Singleton(curator) {
+ @Override public void activate() {
+ throw new RuntimeException();
+ }
+ @Override public void deactivate() {
+ stunning.arriveAndAwaitAdvance();
+ stunning.arriveAndAwaitAdvance();
+ throw new RuntimeException();
+ }
+ @Override public String toString() {
+ return "failing singleton";
+ }
+ });
+ stunning.arriveAndAwaitAdvance();
+ thrownMessage.set(e.getMessage());
+ }).start();
+
+ stunning.arriveAndAwaitAdvance(); // Failing component is about to be deactivated.
+ assertFalse(newSingleton.isActive);
+ assertTrue(curator.isActive(newSingleton.id())); // No actual active components, but container has the lease.
+ stunning.arriveAndAwaitAdvance(); // Failing component is done being deactivated.
+ stunning.arriveAndAwaitAdvance(); // Failing component is done cleaning up after itself.
+ assertTrue(newSingleton.isActive);
+ assertEquals("failed to register failing singleton", thrownMessage.get());
+ newSingleton.deconstruct();
+
+ curator.deconstruct();
+ }
+ }
+
+ @Test
+ public void testSingletonsInDifferentContainers() {
+ try (MockCurator wrapped = new MockCurator()) {
+ CuratorWrapper curator = new CuratorWrapper(wrapped, Clock.systemUTC(), Duration.ofMillis(100));
+
+ // Simulate a different container holding the lock.
+ Singleton singleton;
+ try (Lock lock = wrapped.lock(lockPath, Duration.ofSeconds(1))) {
+ singleton = new Singleton(curator);
+ assertFalse(singleton.isActive);
+ assertFalse(curator.isActive(singleton.id()));
+ singleton.phaser.register();
+ }
+
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertTrue(curator.isActive(singleton.id()));
+ assertTrue(singleton.isActive);
+
+ // Simulate a different container wanting the lock.
+ Phaser stunning = new Phaser(2);
+ new Thread(() -> {
+ try (Lock lock = wrapped.lock(lockPath, Duration.ofSeconds(2))) {
+ stunning.arriveAndAwaitAdvance();
+ stunning.arriveAndAwaitAdvance();
+ }
+ }).start();
+
+ // Simulate connection loss for our singleton's ZK session.
+ ((MockCuratorFramework) wrapped.framework()).connectionStateListeners.listeners.forEach(listener -> listener.stateChanged(null, ConnectionState.LOST));
+ stunning.arriveAndAwaitAdvance();
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertFalse(singleton.isActive);
+
+ // Connection is restored, and the other container releases the lock again.
+ stunning.arriveAndAwaitAdvance();
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertTrue(singleton.isActive);
+ singleton.phaser.arriveAndDeregister();
+ singleton.deconstruct();
+ assertFalse(singleton.isActive);
+
+ curator.deconstruct();
+ }
+ }
+
+ static class Singleton extends AbstractSingletonWorker {
+ Singleton(VespaCurator curator) { register(curator, Duration.ofSeconds(2)); }
+ boolean isActive;
+ Phaser phaser = new Phaser(1);
+ @Override protected String id() { return "singleton"; } // ... lest anonymous subclasses get different IDs ... ƪ(`▿▿▿▿´ƪ)
+ @Override public void activate() { isActive = true; phaser.arriveAndAwaitAdvance(); }
+ @Override public void deactivate() { isActive = false; phaser.arriveAndAwaitAdvance(); }
+ @Override public void deconstruct() { unregister(Duration.ofSeconds(2)); }
+ }
+
+}