summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2022-10-10 13:16:59 +0200
committerGitHub <noreply@github.com>2022-10-10 13:16:59 +0200
commit362a8b6ffc957d8fc425e50ca18fcfe14d4aa974 (patch)
tree457a1c4528fc6ae31b2303da0d488fe8dcbe83f4
parentd0a3ff8edc557295ed4f731fd1967e6ddce3d679 (diff)
parentbaca4a67028ef18f340df7c87b31a480a44938e0 (diff)
Merge pull request #24351 from vespa-engine/jonmv/expanded-zk-API
Jonmv/expanded zk api
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java1
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java1
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java1
-rw-r--r--vespajlib/src/main/java/com/yahoo/yolean/UncheckedInterruptedException.java2
-rw-r--r--zkfacade/abi-spec.json79
-rw-r--r--zkfacade/pom.xml6
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java46
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java127
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java330
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java95
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java105
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java122
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java246
-rw-r--r--zookeeper-server/zookeeper-server/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java10
14 files changed, 1116 insertions, 55 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
index d5324a3f0b8..5c369d2508e 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -15,7 +15,6 @@ import com.yahoo.vespa.curator.Lock;
import com.yahoo.yolean.Exceptions;
import java.io.Closeable;
-import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
index 5e09dd4732d..307e0e17955 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
@@ -246,6 +246,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
", have " + nonRetiredNodes + " non-retired");
}
cluster.addSimpleComponent("com.yahoo.vespa.curator.Curator", null, "zkfacade");
+ cluster.addSimpleComponent("com.yahoo.vespa.curator.CuratorWrapper", null, "zkfacade");
// These need to be setup so that they will use the container's config id, since each container
// have different config (id of zookeeper server)
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java
index 272dfa19f64..cf6b6365792 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilderTest.java
@@ -570,6 +570,7 @@ public class ContainerModelBuilderTest extends ContainerModelBuilderTestBase {
ApplicationContainerCluster cluster = model.getContainerClusters().get("default");
assertNotNull(cluster);
assertComponentConfigured(cluster, "com.yahoo.vespa.curator.Curator");
+ assertComponentConfigured(cluster, "com.yahoo.vespa.curator.CuratorWrapper");
cluster.getContainers().forEach(container -> {
assertComponentConfigured(container, "com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer");
assertComponentConfigured(container, "com.yahoo.vespa.zookeeper.Reconfigurer");
diff --git a/vespajlib/src/main/java/com/yahoo/yolean/UncheckedInterruptedException.java b/vespajlib/src/main/java/com/yahoo/yolean/UncheckedInterruptedException.java
index d3317b5fb26..934a1b17c70 100644
--- a/vespajlib/src/main/java/com/yahoo/yolean/UncheckedInterruptedException.java
+++ b/vespajlib/src/main/java/com/yahoo/yolean/UncheckedInterruptedException.java
@@ -17,7 +17,7 @@ public class UncheckedInterruptedException extends RuntimeException {
this(cause.toString(), cause, restoreInterruptFlags);
}
- public UncheckedInterruptedException(String message, boolean restoreInterruptFlag) { this(message, null, false); }
+ public UncheckedInterruptedException(String message, boolean restoreInterruptFlag) { this(message, null, restoreInterruptFlag); }
public UncheckedInterruptedException(String message, InterruptedException cause) { this(message, cause, false); }
diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json
index d227f8490dc..12e165b6cc9 100644
--- a/zkfacade/abi-spec.json
+++ b/zkfacade/abi-spec.json
@@ -1,4 +1,70 @@
{
+ "com.yahoo.vespa.curator.api.AbstractSingletonWorker": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker"
+ ],
+ "attributes": [
+ "public",
+ "abstract"
+ ],
+ "methods": [
+ "public void <init>()",
+ "public java.lang.String id()",
+ "public final void register(com.yahoo.vespa.curator.api.VespaCurator, java.time.Duration)",
+ "public final void unregister(java.time.Duration)"
+ ],
+ "fields": []
+ },
+ "com.yahoo.vespa.curator.api.VespaCurator$Data": {
+ "superClass": "java.lang.Record",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "record"
+ ],
+ "methods": [
+ "public void <init>(com.yahoo.vespa.curator.api.VespaCurator$Meta, byte[])",
+ "public final java.lang.String toString()",
+ "public final int hashCode()",
+ "public final boolean equals(java.lang.Object)",
+ "public com.yahoo.vespa.curator.api.VespaCurator$Meta meta()",
+ "public byte[] data()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.vespa.curator.api.VespaCurator$Meta": {
+ "superClass": "java.lang.Record",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "record"
+ ],
+ "methods": [
+ "public void <init>(int)",
+ "public final java.lang.String toString()",
+ "public final int hashCode()",
+ "public final boolean equals(java.lang.Object)",
+ "public int version()"
+ ],
+ "fields": []
+ },
+ "com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract void activate()",
+ "public abstract void deactivate()"
+ ],
+ "fields": []
+ },
"com.yahoo.vespa.curator.api.VespaCurator": {
"superClass": "java.lang.Object",
"interfaces": [],
@@ -8,7 +74,18 @@
"abstract"
],
"methods": [
- "public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)"
+ "public abstract java.util.Optional stat(com.yahoo.path.Path)",
+ "public abstract java.util.Optional read(com.yahoo.path.Path)",
+ "public abstract com.yahoo.vespa.curator.api.VespaCurator$Meta write(com.yahoo.path.Path, byte[])",
+ "public abstract java.util.Optional write(com.yahoo.path.Path, byte[], int)",
+ "public abstract void deleteAll(com.yahoo.path.Path)",
+ "public abstract void delete(com.yahoo.path.Path)",
+ "public abstract boolean delete(com.yahoo.path.Path, int)",
+ "public abstract java.util.List list(com.yahoo.path.Path)",
+ "public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)",
+ "public abstract java.util.concurrent.Future registerSingleton(java.lang.String, com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
+ "public abstract java.util.concurrent.Future unregisterSingleton(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
+ "public abstract boolean isActive(java.lang.String)"
],
"fields": []
}
diff --git a/zkfacade/pom.xml b/zkfacade/pom.xml
index 17be17d8302..41053585185 100644
--- a/zkfacade/pom.xml
+++ b/zkfacade/pom.xml
@@ -13,6 +13,12 @@
<version>8-SNAPSHOT</version>
<dependencies>
<dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>testutil</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index f159d471f55..1da51beef29 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -6,7 +6,6 @@ import com.yahoo.component.AbstractComponent;
import com.yahoo.component.annotation.Inject;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.path.Path;
-import com.yahoo.vespa.curator.api.VespaCurator;
import com.yahoo.vespa.curator.recipes.CuratorCounter;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
@@ -59,13 +58,13 @@ import java.util.logging.Logger;
* @author vegardh
* @author bratseth
*/
-public class Curator extends AbstractComponent implements VespaCurator, AutoCloseable {
+public class Curator extends AbstractComponent implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(Curator.class.getName());
private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg"));
// Note that session timeout has min and max values are related to tickTime defined by server, see configserver.def
- private static final Duration ZK_SESSION_TIMEOUT = Duration.ofSeconds(120);
+ static final Duration ZK_SESSION_TIMEOUT = Duration.ofSeconds(120);
private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1);
@@ -195,7 +194,11 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
* If the path and any of its parents does not exists they are created.
*/
// TODO: Use create().orSetData() in Curator 4 and later
- public void set(Path path, byte[] data) {
+ public Stat set(Path path, byte[] data) {
+ return set(path, data, -1);
+ }
+
+ public Stat set(Path path, byte[] data, int expectedVersion) {
if (data.length > juteMaxBuffer)
throw new IllegalArgumentException("Cannot not set data at " + path.getAbsolute() + ", " +
data.length + " bytes is too much, max number of bytes allowed per node is " + juteMaxBuffer);
@@ -205,12 +208,13 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
String absolutePath = path.getAbsolute();
try {
- framework().setData().forPath(absolutePath, data);
+ return framework().setData().withVersion(expectedVersion).forPath(absolutePath, data);
} catch (Exception e) {
throw new RuntimeException("Could not set data at " + absolutePath, e);
}
}
+
/** @see #create(Path, Duration) */
public boolean create(Path path) { return create(path, null); }
@@ -220,6 +224,9 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
* Returns whether a change was attempted.
*/
public boolean create(Path path, Duration ttl) {
+ return create(path, ttl, null);
+ }
+ private boolean create(Path path, Duration ttl, Stat stat) {
if (exists(path)) return false;
String absolutePath = path.getAbsolute();
@@ -231,7 +238,8 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
throw new IllegalArgumentException(ttl.toString());
b.withTtl(millis).withMode(CreateMode.PERSISTENT_WITH_TTL);
}
- b.creatingParentsIfNeeded().forPath(absolutePath, new byte[0]);
+ if (stat == null) b.creatingParentsIfNeeded() .forPath(absolutePath, new byte[0]);
+ else b.creatingParentsIfNeeded().storingStatIn(stat).forPath(absolutePath, new byte[0]);
} catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {
// Path created between exists() and create() call, do nothing
} catch (Exception e) {
@@ -258,12 +266,25 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
}
/**
- * Deletes the given path and any children it may have.
- * If the path does not exists nothing is done.
+ * Deletes the path and any children it may have.
+ * If the path does not exist, nothing is done.
*/
public void delete(Path path) {
+ delete(path, true);
+ }
+
+ /**
+ * Deletes the path and any children it may have.
+ * If the path does not exist, nothing is done.
+ */
+ public void delete(Path path, boolean recursive) {
+ delete(path, -1, recursive);
+ }
+
+ public void delete(Path path, int expectedVersion, boolean recursive) {
try {
- framework().delete().guaranteed().deletingChildrenIfNeeded().forPath(path.getAbsolute());
+ if (recursive) framework().delete().guaranteed().deletingChildrenIfNeeded().withVersion(expectedVersion).forPath(path.getAbsolute());
+ else framework().delete().guaranteed() .withVersion(expectedVersion).forPath(path.getAbsolute());
} catch (KeeperException.NoNodeException e) {
// Do nothing
} catch (Exception e) {
@@ -290,8 +311,13 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
* Empty is returned if the path does not exist.
*/
public Optional<byte[]> getData(Path path) {
+ return getData(path, null);
+ }
+
+ Optional<byte[]> getData(Path path, Stat stat) {
try {
- return Optional.of(framework().getData().forPath(path.getAbsolute()));
+ return stat == null ? Optional.of(framework().getData() .forPath(path.getAbsolute()))
+ : Optional.of(framework().getData().storingStatIn(stat).forPath(path.getAbsolute()));
}
catch (KeeperException.NoNodeException e) {
return Optional.empty();
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
new file mode 100644
index 00000000000..ccde53fcf39
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
@@ -0,0 +1,127 @@
+package com.yahoo.vespa.curator;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.annotation.Inject;
+import com.yahoo.path.Path;
+import com.yahoo.vespa.curator.api.VespaCurator;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.data.Stat;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link VespaCurator} which delegates to a {@link Curator}.
+ * This prefixes all paths with {@code "/user"}, to ensure separation with system ZK usage.
+ *
+ * @author jonmv
+ */
+public class CuratorWrapper extends AbstractComponent implements VespaCurator {
+
+ static final Path userRoot = Path.fromString("user");
+
+ private final Curator curator;
+ private final SingletonManager singletons;
+
+ @Inject
+ public CuratorWrapper(Curator curator) {
+ this(curator, Clock.systemUTC(), Duration.ofSeconds(1));
+ }
+
+ CuratorWrapper(Curator curator, Clock clock, Duration tickTimeout) {
+ this.curator = curator;
+ this.singletons = new SingletonManager(curator, clock, tickTimeout);
+
+ curator.framework().getConnectionStateListenable().addListener((curatorFramework, connectionState) -> {
+ if (connectionState == ConnectionState.LOST) singletons.invalidate();
+ });
+ }
+
+ @Override
+ public Optional<Meta> stat(Path path) {
+ return curator.getStat(userRoot.append(path)).map(stat -> new Meta(stat.getVersion()));
+ }
+
+ @Override
+ public Optional<Data> read(Path path) {
+ Stat stat = new Stat();
+ return curator.getData(userRoot.append(path), stat).map(data -> new Data(new Meta(stat.getVersion()), data));
+ }
+
+ @Override
+ public Meta write(Path path, byte[] data) {
+ return new Meta(curator.set(userRoot.append(path), data).getVersion());
+ }
+
+ @Override
+ public Optional<Meta> write(Path path, byte[] data, int expectedVersion) {
+ try {
+ return Optional.of(new Meta(curator.set(userRoot.append(path), data, expectedVersion).getVersion()));
+ }
+ catch (RuntimeException e) {
+ if (e.getCause() instanceof BadVersionException) return Optional.empty();
+ throw e;
+ }
+ }
+
+ @Override
+ public void deleteAll(Path path) {
+ curator.delete(userRoot.append(path));
+ }
+
+ @Override
+ public void delete(Path path) {
+ curator.delete(userRoot.append(path), false);
+ }
+
+ @Override
+ public boolean delete(Path path, int expectedVersion) {
+ try {
+ curator.delete(userRoot.append(path), expectedVersion, false);
+ return true;
+ }
+ catch (RuntimeException e) {
+ if (e.getCause() instanceof BadVersionException) return false;
+ throw e;
+ }
+ }
+
+ @Override
+ public List<String> list(Path path) {
+ return curator.getChildren(userRoot.append(path));
+ }
+
+ @Override
+ public AutoCloseable lock(Path path, Duration timeout) {
+ // TODO jonmv: clear up
+ Lock current, old = curator.lock(path, timeout);
+ try { current = curator.lock(userRoot.append(path), timeout); }
+ catch (Throwable t) { old.close(); throw t; }
+ return () -> { try(old) { current.close(); } };
+ }
+
+ @Override
+ public CompletableFuture<?> registerSingleton(String singletonId, SingletonWorker singleton) {
+ return singletons.register(singletonId, singleton);
+ }
+
+ @Override
+ public CompletableFuture<?> unregisterSingleton(SingletonWorker singleton) {
+ return singletons.unregister(singleton);
+ }
+
+ @Override
+ public boolean isActive(String singletonId) {
+ return singletons.isActive(singletonId);
+ }
+
+ @Override
+ public void deconstruct() {
+ singletons.close();
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
new file mode 100644
index 00000000000..cf6e51836ec
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
@@ -0,0 +1,330 @@
+package com.yahoo.vespa.curator;
+
+import com.yahoo.path.Path;
+import com.yahoo.protect.Process;
+import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages {@link com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker}.
+ *
+ * @author jonmv
+ */
+class SingletonManager implements AutoCloseable {
+
+ private static final Logger logger = Logger.getLogger(SingletonManager.class.getName());
+
+ private final Curator curator;
+ private final Clock clock;
+ private final Duration tickTimeout;
+ private final Map<String, Janitor> janitors = new HashMap<>();
+ private final Map<String, Integer> count = new HashMap<>();
+ private final Map<SingletonWorker, String> registrations = new IdentityHashMap<>();
+
+ SingletonManager(Curator curator, Clock clock, Duration tickTimeout) {
+ this.curator = curator;
+ this.clock = clock;
+ this.tickTimeout = tickTimeout;
+ }
+
+ synchronized CompletableFuture<?> register(String singletonId, SingletonWorker singleton) {
+ if (singletonId.isEmpty() || singletonId.contains("/") || singletonId.contains("..")) {
+ throw new IllegalArgumentException("singleton ID must be non-empty, and may not contain '/' or '..', but got " + singletonId);
+ }
+ String old = registrations.putIfAbsent(singleton, singletonId);
+ if (old != null) throw new IllegalArgumentException(singleton + " already registered with ID " + old);
+ count.merge(singletonId, 1, Integer::sum);
+ return janitors.computeIfAbsent(singletonId, Janitor::new).register(singleton);
+ }
+
+ synchronized CompletableFuture<?> unregister(SingletonWorker singleton) {
+ String id = registrations.remove(singleton);
+ if (id == null) throw new IllegalArgumentException(singleton + " is not registered");
+ return janitors.get(id).unregister(singleton)
+ .whenComplete((__, ___) -> unregistered(id, singleton));
+ }
+
+ synchronized void unregistered(String singletonId, SingletonWorker singleton) {
+ registrations.remove(singleton);
+ if (count.merge(singletonId, -1, Integer::sum) > 0) return;
+ count.remove(singletonId);
+ janitors.remove(singletonId).shutdown();
+ }
+
+ /**
+ * The instant until which this container holds the exclusive lease for activation of singletons with this ID.
+ * The container may abandon the lease early, if deactivation is triggered and completes before the deadline.
+ * Unless connection to the underlying ZK cluster is lost, the returned value will regularly move forwards in time.
+ */
+ synchronized Optional<Instant> activeUntil(String singletonId) {
+ return Optional.ofNullable(janitors.get(singletonId)).map(janitor -> janitor.doom.get());
+ }
+
+ /** Whether this container currently holds the activation lease for the given singleton ID. */
+ boolean isActive(String singletonId) {
+ return activeUntil(singletonId).map(clock.instant()::isBefore).orElse(false);
+ }
+
+ /** Invalidate all leases, due to connection loss. */
+ synchronized void invalidate() {
+ for (Janitor janitor : janitors.values()) janitor.invalidate();
+ }
+
+ @Override
+ public synchronized void close() {
+ for (SingletonWorker singleton : List.copyOf(registrations.keySet())) {
+ String id = registrations.get(singleton);
+ logger.log(Level.WARNING, singleton + " still registered with id '" + id + "' at shutdown");
+ unregister(singleton);
+ }
+ }
+
+
+ private class Janitor {
+
+ static class Task {
+
+ enum Type { register, unregister }
+
+ final Type type;
+ final SingletonWorker singleton;
+ final CompletableFuture<?> future = new CompletableFuture<>();
+
+ private Task(Type type, SingletonWorker singleton) {
+ this.type = type;
+ this.singleton = singleton;
+ }
+
+ static Task register(SingletonWorker singleton) { return new Task(Type.register, singleton); }
+ static Task unregister(SingletonWorker singleton) { return new Task(Type.unregister, singleton); }
+
+ }
+
+ private static final Instant INVALID = Instant.ofEpochMilli(-1);
+
+ final BlockingDeque<Task> tasks = new LinkedBlockingDeque<>();
+ final Deque<SingletonWorker> singletons = new ArrayDeque<>(2);
+ final AtomicReference<Instant> doom = new AtomicReference<>();
+ final AtomicBoolean shutdown = new AtomicBoolean();
+ final Thread worker;
+ final String id;
+ final Path path;
+ Lock lock = null;
+ boolean active;
+
+
+ Janitor(String id) {
+ this.id = id;
+ this.path = Path.fromString("/vespa/singleton/v1/" + id);
+ this.worker = new Thread(this::run, "singleton-janitor-" + id + "-");
+
+ worker.setDaemon(true);
+ worker.start();
+ }
+
+ public void unlock() {
+ doom.set(null);
+ if (lock != null) try {
+ lock.close();
+ lock = null;
+ }
+ catch (Exception e) {
+ logger.log(Level.WARNING, "failed closing " + lock, e);
+ }
+ }
+
+ private void run() {
+ try {
+ while ( ! shutdown.get()) {
+ try {
+ // Attempt to acquire the lock, and extend our doom.
+ renewLease();
+ // Ensure activation status is set accordingly to doom, or clear state if this fails.
+ updateStatus();
+ // Process the next pending, externally triggered task, if any.
+ doTask();
+ }
+ catch (InterruptedException e) {
+ if ( ! shutdown.get()) {
+ logger.log(Level.WARNING, worker + " interrupted, restarting event loop");
+ }
+ }
+ }
+ for (Task task : tasks) task.future.cancel(true);
+ unlock();
+ }
+ catch (Throwable t) {
+ Process.logAndDie(worker + " can't continue, shutting down", t);
+ }
+ }
+
+ protected void doTask() throws InterruptedException {
+ Task task = tasks.poll(tickTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ try {
+ if (task != null) switch (task.type) {
+ case register -> {
+ doRegister(task.singleton);
+ task.future.complete(null);
+ }
+ case unregister -> {
+ doUnregister(task.singleton);
+ task.future.complete(null);
+ }
+ default -> throw new AssertionError("unknown task type '" + task.type + "'");
+ }
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Uncaught exception in " + worker, e);
+ task.future.completeExceptionally(e);
+ }
+ }
+
+ private void doRegister(SingletonWorker singleton) {
+ SingletonWorker current = singletons.peek();
+ singletons.push(singleton);
+ if (active) {
+ RuntimeException e = null;
+ if (current != null) try {
+ current.deactivate();
+ }
+ catch (RuntimeException f) {
+ e = f;
+ }
+ try {
+ singleton.activate();
+ }
+ catch (RuntimeException f) {
+ if (e == null) e = f;
+ else e.addSuppressed(f);
+ }
+ if (e != null) throw e;
+ }
+ }
+
+ private void doUnregister(SingletonWorker singleton) {
+ RuntimeException e = null;
+ SingletonWorker current = singletons.peek();
+ if ( ! singletons.remove(singleton)) return;
+ if (active && current == singleton) {
+ try {
+ singleton.deactivate();
+ }
+ catch (RuntimeException f) {
+ e = f;
+ }
+ if ( ! singletons.isEmpty()) try {
+ singletons.peek().activate();
+ }
+ catch (RuntimeException f) {
+ if (e == null) e = f;
+ else e.addSuppressed(f);
+ }
+ }
+ if (singletons.isEmpty()) {
+ unlock();
+ }
+ if (e != null) throw e;
+ }
+
+ /**
+ * Attempt to acquire the lock, if not held.
+ * If lock is held, or acquired, ping the ZK cluster to extend our deadline.
+ */
+ private void renewLease() {
+ if (doom.get() == INVALID) {
+ unlock();
+ }
+ // Witness value to detect if invalidation occurs between here and successful ping.
+ Instant ourDoom = doom.get();
+ Instant start = clock.instant();
+ if (lock == null) try {
+ lock = curator.lock(path.append("lock"), tickTimeout);
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.FINE, "Failed acquiring lock for '" + path + "' within " + tickTimeout, e);
+ return;
+ }
+ try {
+ curator.set(path.append("ping"), new byte[0]);
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.FINE, "Failed pinging ZK cluster", e);
+ return;
+ }
+ if ( ! doom.compareAndSet(ourDoom, start.plus(Curator.ZK_SESSION_TIMEOUT.multipliedBy(9).dividedBy(10)))) {
+ logger.log(Level.FINE, "Deadline changed, invalidating current lease renewal");
+ }
+ }
+
+ /**
+ * Attempt to activate or deactivate if status has changed.
+ * If activation fails, we release the lock, to a different container may acquire it.
+ */
+ private void updateStatus() {
+ Instant ourDoom = doom.get();
+ boolean shouldBeActive = ourDoom != null && ! clock.instant().isAfter(ourDoom);
+ if ( ! active && shouldBeActive) {
+ try {
+ active = true;
+ if ( ! singletons.isEmpty()) singletons.peek().activate();
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Failed to activate " + singletons.peek() + ", deactivating again", e);
+ shouldBeActive = false;
+ }
+ }
+ if (active && ! shouldBeActive) {
+ try {
+ active = false;
+ if ( ! singletons.isEmpty()) singletons.peek().deactivate();
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Failed to deactivate " + singletons.peek(), e);
+ }
+ unlock();
+ }
+ }
+
+ CompletableFuture<?> register(SingletonWorker singleton) {
+ Task task = Task.register(singleton);
+ tasks.offer(task);
+ return task.future;
+ }
+
+ CompletableFuture<?> unregister(SingletonWorker singleton) {
+ Task task = Task.unregister(singleton);
+ tasks.offer(task);
+ return task.future;
+ }
+
+ void invalidate() {
+ doom.set(INVALID);
+ }
+
+ void shutdown() {
+ if ( ! shutdown.compareAndSet(false, true)) {
+ logger.log(Level.WARNING, "Shutdown called more than once on " + this);
+ }
+ }
+
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java
new file mode 100644
index 00000000000..dc0540e73c5
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java
@@ -0,0 +1,95 @@
+package com.yahoo.vespa.curator.api;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.concurrent.UncheckedTimeoutException;
+import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker;
+import com.yahoo.yolean.UncheckedInterruptedException;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Super-class for implementations of {@link SingletonWorker}.
+ * Users should call {@link VespaCurator#registerSingleton} at construction, and
+ * {@link VespaCurator#unregisterSingleton} at deconstruction.
+ * If this fails activation on registration, it will immediately unregister as well, before propagating the error.
+ * Consequently, registering this on construction will allow detecting a failed component generation, and instead
+ * retain the previous generation, provided enough verification is done in {@link #activate()}.
+ * The default ID to use with registration is the concrete class name, e.g., {@code my.example.Singleton}.
+ *
+ * @author jonmv
+ */
+public abstract class AbstractSingletonWorker implements SingletonWorker {
+
+ private final AtomicReference<VespaCurator> owner = new AtomicReference<>();
+
+ /**
+ * The singleton ID to use when registering this with a {@link VespaCurator}.
+ * At most one singleton worker with the given ID will be active, in the cluster, at any time.
+ * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently
+ * allowed to have an active singleton with the given ID.
+ */
+ public String id() { return getClass().getName(); }
+
+ /**
+ * <strong>Call this at the end of construction of the owner of this.</strong>
+ * If this activates the singleton, this happens synchronously, and any errors are propagated here.
+ * If this replaces an already active singleton, its deactivation is also called, prior to activation of this.
+ * If (de)activation is not complete within the given timeout, a timeout exception is thrown.
+ * If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout.
+ */
+ public final void register(VespaCurator curator, Duration timeout) {
+ if ( ! owner.compareAndSet(null, curator)) {
+ throw new IllegalArgumentException(this + " is already registered with " + owner.get());
+ }
+ try {
+ await(curator.registerSingleton(id(), this), timeout, "register");
+ }
+ catch (RuntimeException e) {
+ try {
+ unregister(timeout);
+ }
+ catch (Exception f) {
+ e.addSuppressed(f);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * <strong>Call this at the start of deconstruction of the owner of this!</strong>
+ * <p>
+ * If this singleton is active, deactivation will be called synchronously, and errors propagated here.
+ * If this also triggers activation of a new singleton, its activation is called after deactivation of this.
+ * If (de)activation is not complete within the given timeout, a timeout exception is thrown.
+ */
+ public final void unregister(Duration timeout) {
+ VespaCurator curator = owner.getAndSet(null);
+ if (curator == null) {
+ throw new IllegalArgumentException(this + " was not registered with any owners");
+ }
+ await(curator.unregisterSingleton(this), timeout, "unregister");
+ }
+
+ private void await(Future<?> future, Duration timeout, String verb) {
+ try {
+ future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ future.cancel(true);
+ throw new UncheckedInterruptedException("interrupted while " + verb + "ing " + this, e, true);
+ }
+ catch (TimeoutException e) {
+ future.cancel(true);
+ throw new UncheckedTimeoutException("timed out while " + verb + "ing " + this, e);
+ }
+ catch (ExecutionException e) {
+ throw new RuntimeException("failed to " + verb + " " + this, e.getCause());
+ }
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
index 32aef2da118..6ac44631ed2 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
@@ -5,16 +5,119 @@ import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.path.Path;
import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
/**
* A client for a ZooKeeper cluster running inside Vespa. Applications that want to use ZooKeeper can inject this in
* their code.
*
* @author mpolden
+ * @author jonmv
*/
public interface VespaCurator {
- /** Create and acquire a re-entrant lock in given path. This blocks until the lock is acquired or timeout elapses. */
+ /** Returns the stat for the node at the given path, or empty if no node exists at that path. */
+ Optional<Meta> stat(Path path);
+
+ /** Returns the content and stat for the node at the given path, or empty if no node exists at that path. */
+ Optional<Data> read(Path path);
+
+ /**
+ * Writes the given data to a node at the given path, creating it and its parents as needed, and returns the
+ * stat of the modified node. Failure to write, due to connection loss, is retried a limited number of times.
+ */
+ Meta write(Path path, byte[] data);
+
+ /**
+ * Atomically compares the version in the stat of the node at the given path, with the expected version, and then:
+ * if they are equal, attempts the write operation (see {@link #write(Path, byte[])});
+ * otherwise, return empty.
+ */
+ Optional<Meta> write(Path path, byte[] data, int expectedVersion);
+
+ /** Recursively deletes any node at the given path, and any children it may have. */
+ void deleteAll(Path path);
+
+ /** Deletes the node at the given path. Failres due to connection loss are retried a limited number of times. */
+ void delete(Path path);
+
+ /**
+ * Atomically compares the version in the stat of the node at the given path, with the expected version, and then:
+ * if they are equal, attempts the delete operation (see {@link #delete(Path)}), and returns {@code} true;
+ * otherwise, returns {@code false}.
+ */
+ boolean delete(Path path, int expectedVersion);
+
+ /** Lists the children of the node at the given path, or throws if there is no node at that path. */
+ List<String> list(Path path);
+
+ /** Creates and acquires a re-entrant lock with the given path. This blocks until the lock is acquired or timeout elapses. */
AutoCloseable lock(Path path, Duration timeout) throws UncheckedTimeoutException;
+ /** Data of a ZK node, including content (possibly empty, never {@code null}) and metadata. */
+ record Data(Meta meta, byte[] data) { }
+
+ /** Metadata for a ZK node. */
+ record Meta(int version) { }
+
+ /**
+ * Register the singleton with the framework, so it may become active, and returns a
+ * synchronisation handle to any deactivations or activations triggered by this.
+ * If there is already another active singleton with the given ID (in this JVM),
+ * that will be deactivated before the new one is activated.
+ */
+ Future<?> registerSingleton(String singletonId, SingletonWorker singleton);
+
+ /**
+ * Unregister with the framework, so this singleton may no longer be active, and returns
+ * a synchronisation handle to any deactivation or activation triggered by this.
+ * If this is the last singleton registered with its ID, then this container immediately releases
+ * the activation lease for that ID, so another container may acquire it.
+ */
+ Future<?> unregisterSingleton(SingletonWorker singleton);
+
+ /**
+ * Whether this container currently holds the exclusive lease for activation of singletons with this ID.
+ */
+ boolean isActive(String singletonId);
+
+ /**
+ * Callback interface for processes of which only a single instance should be active at any time, across all
+ * containers in the cluster, and across all component generations. Notes:
+ * <ul>
+ * <li>{@link #activate()} is called by the system on a singleton whenever it is the newest registered
+ * singleton in this container, and this container has the lease for the ID with which the singleton
+ * was registered. See {@link #registerSingleton} and {@link #isActive}.</li>
+ * <li>{@link #deactivate()} is called by the system on a singleton which is currently active whenever
+ * the above no longer holds. See {@link #unregisterSingleton}.</li>
+ * <li>Callbacks for the same ID are always invoked by the same thread, in serial;
+ * the callbacks must return in a timely manner, but are allowed to throw exceptions.</li>
+ * <li>Activation and deactivation may be triggered by:
+ * <ol><li>the container acquiring or losing the activation lease; or</li>
+ * <li>registration of unregistration of a new or obsolete singleton.</li></ol>
+ * Events triggered by the latter happen synchronously, and errors are propagated to the caller for cleanup.
+ * Events triggered by the former may happen in the background, and because the system tries to always have
+ * one activated singleton, exceptions during activation will cause the container to abandon its lease, so
+ * another container may obtain it instead; exceptions during deactivation are only logged.
+ * </li>
+ * <li>A container without any registered singletons will not attempt to hold the activation lease.</li>
+ * </ul>
+ * See {@link AbstractSingletonWorker} for an abstract superclass to use for implementations.
+ */
+ interface SingletonWorker {
+
+ /**
+ * Called by the system whenever this singleton becomes the single active worker.
+ * If this is triggered because the container obtains the activation lease, and activation fails,
+ * then the container immediately releases the lease, so another container may acquire it instead.
+ */
+ void activate();
+
+ /** Called by the system whenever this singleton is no longer the single active worker. */
+ void deactivate();
+
+ }
+
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
index 81fb24bd7e5..c8566015ea1 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
@@ -8,6 +8,7 @@ import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.path.Path;
import com.yahoo.vespa.curator.CompletionTimeoutException;
import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
import com.yahoo.vespa.curator.recipes.CuratorLockException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
@@ -83,6 +84,8 @@ import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
@@ -124,6 +127,8 @@ public class MockCuratorFramework implements CuratorFramework {
/** Listeners to changes to a particular path */
private final ListenerMap listeners = new ListenerMap();
+ public final MockListenable<ConnectionStateListener> connectionStateListeners = new MockListenable<>();
+
private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT;
private int monotonicallyIncreasingNumber = 0;
@@ -267,7 +272,7 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public Listenable<ConnectionStateListener> getConnectionStateListenable() {
- return new MockListenable<>();
+ return connectionStateListeners;
}
@Override
@@ -351,7 +356,7 @@ public class MockCuratorFramework implements CuratorFramework {
// ----- Start of adaptor methods from Curator to the mock file system -----
/** Creates a node below the given directory root */
- private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl)
+ private String createNode(String pathString, byte[] content, boolean createParents, Stat stat, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl)
throws KeeperException.NodeExistsException, KeeperException.NoNodeException {
validatePath(pathString);
Path path = Path.fromString(pathString);
@@ -371,18 +376,21 @@ public class MockCuratorFramework implements CuratorFramework {
node.setTtl(ttl);
String nodePath = "/" + path.getParentPath().toString() + "/" + name;
listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED);
+ if (stat != null) stat.setVersion(node.version());
return nodePath;
}
/** Deletes a node below the given directory root */
- private void deleteNode(String pathString, boolean deleteChildren, MemoryFileSystem.Node root, Listeners listeners)
- throws KeeperException.NoNodeException, KeeperException.NotEmptyException {
+ private void deleteNode(String pathString, boolean deleteChildren, int version, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException {
validatePath(pathString);
Path path = Path.fromString(pathString);
MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
if (parent == null) throw new KeeperException.NoNodeException(path.toString());
MemoryFileSystem.Node node = parent.children().get(path.getName());
if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent);
+ if (version != -1 && version != node.version())
+ throw new KeeperException.BadVersionException("expected version " + version + ", but was " + node.version());
if ( ! node.children().isEmpty() && ! deleteChildren)
throw new KeeperException.NotEmptyException(path.toString());
parent.remove(path.getName());
@@ -390,16 +398,20 @@ public class MockCuratorFramework implements CuratorFramework {
}
/** Returns the data of a node */
- private byte[] getData(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ private byte[] getData(String pathString, Stat stat, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
validatePath(pathString);
- return getNode(pathString, root).getContent();
+ return getNode(pathString, stat, root).getContent();
}
/** sets the data of an existing node */
- private void setData(String pathString, byte[] content, MemoryFileSystem.Node root, Listeners listeners)
- throws KeeperException.NoNodeException {
+ private void setData(String pathString, byte[] content, int version, Stat stat, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException {
validatePath(pathString);
- getNode(pathString, root).setContent(content);
+ Node node = getNode(pathString, null, root);
+ if (version != -1 && version != node.version())
+ throw new KeeperException.BadVersionException("expected version " + version + ", but was " + node.version());
+ node.setContent(content);
+ if (stat != null) stat.setVersion(node.version());
listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED);
}
@@ -414,13 +426,14 @@ public class MockCuratorFramework implements CuratorFramework {
}
/** Returns a node or throws the appropriate exception if it doesn't exist */
- private MemoryFileSystem.Node getNode(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ private MemoryFileSystem.Node getNode(String pathString, Stat stat, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
validatePath(pathString);
Path path = Path.fromString(pathString);
MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
if (parent == null) throw new KeeperException.NoNodeException(path.toString());
MemoryFileSystem.Node node = parent.children().get(path.getName());
if (node == null) throw new KeeperException.NoNodeException(path.toString());
+ if (stat != null) stat.setVersion(node.version());
return node;
}
@@ -783,6 +796,7 @@ public class MockCuratorFramework implements CuratorFramework {
private abstract static class MockProtectACLCreateModeStatPathAndBytesable<String>
implements ProtectACLCreateModeStatPathAndBytesable<String> {
+ Stat stat = null;
public BackgroundPathAndBytesable<String> withACL(List<ACL> list) {
throw new UnsupportedOperationException("Not implemented in MockCurator");
}
@@ -832,6 +846,7 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat) {
+ this.stat = stat;
return this;
}
@@ -850,12 +865,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, bytes, createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
@Override
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, new byte[0], createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
};
@@ -867,12 +882,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, bytes, createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
@Override
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, new byte[0], createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
};
@@ -890,11 +905,11 @@ public class MockCuratorFramework implements CuratorFramework {
}
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, new byte[0], createParents, null, createMode, fileSystem.root(), listeners, ttl);
}
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, bytes, createParents, null, createMode, fileSystem.root(), listeners, ttl);
}
@Override
@@ -1043,9 +1058,8 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public Stat forPath(String path) throws Exception {
try {
- MemoryFileSystem.Node node = getNode(path, fileSystem.root());
Stat stat = new Stat();
- stat.setVersion(node.version());
+ getNode(path, stat, fileSystem.root());
return stat;
}
catch (KeeperException.NoNodeException e) {
@@ -1067,6 +1081,7 @@ public class MockCuratorFramework implements CuratorFramework {
private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder {
private boolean deleteChildren = false;
+ private int version = -1;
@Override
public BackgroundVersionable deletingChildrenIfNeeded() {
@@ -1081,11 +1096,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public BackgroundPathable<Void> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
public Void forPath(String pathString) throws Exception {
- deleteNode(pathString, deleteChildren, fileSystem.root(), listeners);
+ deleteNode(pathString, deleteChildren, version, fileSystem.root(), listeners);
return null;
}
@@ -1107,7 +1123,7 @@ public class MockCuratorFramework implements CuratorFramework {
}
public byte[] forPath(String path) throws Exception {
- return getData(path, fileSystem.root());
+ return getData(path, null, fileSystem.root());
}
@Override
@@ -1142,13 +1158,34 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public WatchPathable<byte[]> storingStatIn(Stat stat) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ return new WatchPathable<byte[]>() {
+ @Override
+ public byte[] forPath(String path) throws Exception {
+ return getData(path, stat, fileSystem.root());
+ }
+
+ @Override
+ public Pathable<byte[]> watched() {
+ return null;
+ }
+
+ @Override
+ public Pathable<byte[]> usingWatcher(Watcher watcher) {
+ return null;
+ }
+
+ @Override
+ public Pathable<byte[]> usingWatcher(CuratorWatcher watcher) {
+ return null;
+ }
+ };
}
}
// extends MockBackgroundACLPathAndBytesableBuilder<Stat>
private class MockSetDataBuilder implements SetDataBuilder {
+ int version = -1;
@Override
public SetDataBackgroundVersionable compressed() {
throw new UnsupportedOperationException("Not implemented in MockCurator");
@@ -1156,18 +1193,22 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public BackgroundPathAndBytesable<Stat> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
@Override
public Stat forPath(String path, byte[] bytes) throws Exception {
- setData(path, bytes, fileSystem.root(), listeners);
- return null;
+ Stat stat = new Stat();
+ setData(path, bytes, version, stat, fileSystem.root(), listeners);
+ return stat;
}
@Override
- public Stat forPath(String s) throws Exception {
- return null;
+ public Stat forPath(String path) throws Exception {
+ Stat stat = new Stat();
+ setData(path, new byte[0], version, stat, fileSystem.root(), listeners);
+ return stat;
}
@Override
@@ -1206,18 +1247,21 @@ public class MockCuratorFramework implements CuratorFramework {
}
/** Allows addition of directoryListeners which are never called */
- private static class MockListenable<T> implements Listenable<T> {
+ public static class MockListenable<T> implements Listenable<T> {
+
+ public final List<T> listeners = new ArrayList<>();
@Override
public void addListener(T t) {
+ listeners.add(t);
}
@Override
- public void addListener(T t, Executor executor) {
- }
+ public void addListener(T t, Executor executor) { throw new UnsupportedOperationException("not supported in mock curator"); }
@Override
public void removeListener(T t) {
+ listeners.remove(t);
}
}
@@ -1288,13 +1332,13 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- createNode(s, bytes, false, createMode, newRoot, delayedListener, null);
+ createNode(s, bytes, false, null, createMode, newRoot, delayedListener, null);
return new MockCuratorTransactionBridge();
}
@Override
public CuratorTransactionBridge forPath(String s) throws Exception {
- createNode(s, new byte[0], false, createMode, newRoot, delayedListener, null);
+ createNode(s, new byte[0], false, null, createMode, newRoot, delayedListener, null);
return new MockCuratorTransactionBridge();
}
@@ -1316,14 +1360,16 @@ public class MockCuratorFramework implements CuratorFramework {
private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder<CuratorTransactionBridge> {
+ int version = -1;
@Override
public Pathable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
@Override
public CuratorTransactionBridge forPath(String path) throws Exception {
- deleteNode(path, false, newRoot, delayedListener);
+ deleteNode(path, false, version, newRoot, delayedListener);
return new MockCuratorTransactionBridge();
}
@@ -1331,6 +1377,7 @@ public class MockCuratorFramework implements CuratorFramework {
private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder<CuratorTransactionBridge> {
+ int version = -1;
@Override
public VersionPathAndBytesable<CuratorTransactionBridge> compressed() {
throw new UnsupportedOperationException("Not implemented in MockCurator");
@@ -1338,18 +1385,19 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
@Override
public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- MockCuratorFramework.this.setData(s, bytes, newRoot, delayedListener);
+ MockCuratorFramework.this.setData(s, bytes, version, null, newRoot, delayedListener);
return new MockCuratorTransactionBridge();
}
@Override
public CuratorTransactionBridge forPath(String s) throws Exception {
- MockCuratorFramework.this.setData(s, new byte[0], newRoot, delayedListener);
+ MockCuratorFramework.this.setData(s, new byte[0], version, null, newRoot, delayedListener);
return new MockCuratorTransactionBridge();
}
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
new file mode 100644
index 00000000000..131222e62f8
--- /dev/null
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
@@ -0,0 +1,246 @@
+package com.yahoo.vespa.curator;
+
+import com.yahoo.path.Path;
+import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.curator.api.AbstractSingletonWorker;
+import com.yahoo.vespa.curator.api.VespaCurator;
+import com.yahoo.vespa.curator.api.VespaCurator.Meta;
+import com.yahoo.vespa.curator.mock.MockCurator;
+import com.yahoo.vespa.curator.mock.MockCuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.junit.Test;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author jonmv
+ */
+public class CuratorWrapperTest {
+
+ static final Path lockPath = Path.fromString("/vespa/singleton/v1/singleton/lock");
+
+ @Test
+ public void testUserApi() throws Exception {
+ try (Curator wrapped = new MockCurator()) {
+ CuratorWrapper curator = new CuratorWrapper(wrapped);
+
+ Path path = Path.fromString("path");
+ assertEquals(Optional.empty(), curator.stat(path));
+
+ Meta meta = curator.write(path, "data".getBytes(UTF_8));
+ assertEquals(Optional.of(meta), curator.stat(path));
+
+ assertEquals("data", new String(curator.read(path).get().data(), UTF_8));
+ assertEquals(meta, curator.read(path).get().meta());
+
+ assertEquals(Optional.empty(), curator.write(path, new byte[0], 0));
+
+ meta = curator.write(path, new byte[0], meta.version()).get();
+ assertEquals(3, meta.version());
+
+ assertEquals(List.of("path"), curator.list(Path.createRoot()));
+
+ assertFalse(curator.delete(path, 0));
+
+ curator.delete(path, 3);
+
+ assertEquals(List.of(), curator.list(Path.createRoot()));
+
+ try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) {
+ assertEquals(List.of("user", "path"), wrapped.getChildren(Path.createRoot()));
+ assertEquals(List.of("path"), wrapped.getChildren(CuratorWrapper.userRoot));
+ }
+
+ try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) {
+ // Both previous locks were released.
+ }
+ }
+ }
+
+ @Test
+ public void testSingleSingleton() {
+ try (Curator wrapped = new MockCurator()) {
+ Phaser stunning = new Phaser(1);
+ ManualClock clock = new ManualClock() {
+ @Override public Instant instant() {
+ stunning.arriveAndAwaitAdvance();
+ // Let test thread advance time when desired.
+ stunning.arriveAndAwaitAdvance();
+ return super.instant();
+ };
+ };
+ CuratorWrapper curator = new CuratorWrapper(wrapped, clock, Duration.ofMillis(100));
+
+ // First singleton to register becomes active during construction.
+ Singleton singleton = new Singleton(curator);
+ assertTrue(singleton.isActive);
+ assertTrue(wrapped.exists(lockPath));
+ assertTrue(curator.isActive(singleton.id()));
+ singleton.shutdown();
+ assertFalse(singleton.isActive);
+ // ... and deactivated as a result of unregistering again.
+
+ // Singleton can be set up again, but this time, time runs away.
+ singleton = new Singleton(curator) {
+ @Override public void activate() {
+ // Set up sync in clock on next renewLease.
+ super.activate();
+ stunning.register();
+ }
+ };
+ assertTrue(singleton.isActive);
+
+ stunning.arriveAndAwaitAdvance(); // Wait for next renewLease.
+ stunning.arriveAndAwaitAdvance(); // Let next renewLease complete.
+ stunning.arriveAndAwaitAdvance(); // Wait for next updateStatus.
+ clock.advance(Curator.ZK_SESSION_TIMEOUT);
+ singleton.phaser.register(); // Set up so we can synchronise with deactivation.
+ stunning.arriveAndDeregister(); // Let lease expire, and ensure further ticks complete if we lose the race to unregister.
+
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertFalse(singleton.isActive);
+
+ // Singleton is reactivated next tick.
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertTrue(singleton.isActive);
+
+ // Manager unregisters remaining singletons on shutdown.
+ curator.deconstruct();
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertFalse(singleton.isActive);
+ }
+ }
+
+ @Test
+ public void testSingletonsInSameContainer() {
+ try (Curator wrapped = new MockCurator()) {
+ CuratorWrapper curator = new CuratorWrapper(wrapped);
+
+ // First singleton to register becomes active during construction.
+ Singleton singleton = new Singleton(curator);
+ assertTrue(singleton.isActive);
+ assertTrue(wrapped.exists(lockPath));
+ assertTrue(curator.isActive(singleton.id()));
+
+ Singleton newSingleton = new Singleton(curator);
+ assertTrue(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ Singleton newerSingleton = new Singleton(curator);
+ assertTrue(newerSingleton.isActive);
+ assertFalse(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ singleton.shutdown();
+ assertTrue(newerSingleton.isActive);
+ assertFalse(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ newerSingleton.shutdown();
+ assertFalse(newerSingleton.isActive);
+ assertTrue(newSingleton.isActive);
+ assertFalse(singleton.isActive);
+
+ // Add a singleton which fails activation.
+ Phaser stunning = new Phaser(2);
+ AtomicReference<String> thrownMessage = new AtomicReference<>();
+ new Thread(() -> {
+ RuntimeException e = assertThrows(RuntimeException.class,
+ () -> new Singleton(curator) {
+ @Override public void activate() {
+ throw new RuntimeException();
+ }
+ @Override public void deactivate() {
+ stunning.arriveAndAwaitAdvance();
+ stunning.arriveAndAwaitAdvance();
+ throw new RuntimeException();
+ }
+ @Override public String toString() {
+ return "failing singleton";
+ }
+ });
+ stunning.arriveAndAwaitAdvance();
+ thrownMessage.set(e.getMessage());
+ }).start();
+
+ stunning.arriveAndAwaitAdvance(); // Failing component is about to be deactivated.
+ assertFalse(newSingleton.isActive);
+ assertTrue(curator.isActive(newSingleton.id())); // No actual active components, but container has the lease.
+ stunning.arriveAndAwaitAdvance(); // Failing component is done being deactivated.
+ stunning.arriveAndAwaitAdvance(); // Failing component is done cleaning up after itself.
+ assertTrue(newSingleton.isActive);
+ assertEquals("failed to register failing singleton", thrownMessage.get());
+ newSingleton.shutdown();
+
+ curator.deconstruct();
+ }
+ }
+
+ @Test
+ public void testSingletonsInDifferentContainers() {
+ try (MockCurator wrapped = new MockCurator()) {
+ CuratorWrapper curator = new CuratorWrapper(wrapped, Clock.systemUTC(), Duration.ofMillis(100));
+
+ // Simulate a different container holding the lock.
+ Singleton singleton;
+ try (Lock lock = wrapped.lock(lockPath, Duration.ofSeconds(1))) {
+ singleton = new Singleton(curator);
+ assertFalse(singleton.isActive);
+ assertFalse(curator.isActive(singleton.id()));
+ singleton.phaser.register();
+ }
+
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertTrue(curator.isActive(singleton.id()));
+ assertTrue(singleton.isActive);
+
+ // Simulate a different container wanting the lock.
+ Phaser stunning = new Phaser(2);
+ new Thread(() -> {
+ try (Lock lock = wrapped.lock(lockPath, Duration.ofSeconds(2))) {
+ stunning.arriveAndAwaitAdvance();
+ stunning.arriveAndAwaitAdvance();
+ }
+ }).start();
+
+ // Simulate connection loss for our singleton's ZK session.
+ ((MockCuratorFramework) wrapped.framework()).connectionStateListeners.listeners.forEach(listener -> listener.stateChanged(null, ConnectionState.LOST));
+ stunning.arriveAndAwaitAdvance();
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertFalse(singleton.isActive);
+
+ // Connection is restored, and the other container releases the lock again.
+ stunning.arriveAndAwaitAdvance();
+ singleton.phaser.arriveAndAwaitAdvance();
+ assertTrue(singleton.isActive);
+ singleton.phaser.arriveAndDeregister();
+ singleton.shutdown();
+ assertFalse(singleton.isActive);
+
+ curator.deconstruct();
+ }
+ }
+
+ static class Singleton extends AbstractSingletonWorker {
+ Singleton(VespaCurator curator) { register(curator, Duration.ofSeconds(2)); }
+ boolean isActive;
+ Phaser phaser = new Phaser(1);
+ @Override public String id() { return "singleton"; } // ... lest anonymous subclasses get different IDs ... ƪ(`▿▿▿▿´ƪ)
+ @Override public void activate() { isActive = true; phaser.arriveAndAwaitAdvance(); }
+ @Override public void deactivate() { isActive = false; phaser.arriveAndAwaitAdvance(); }
+ public void shutdown() { unregister(Duration.ofSeconds(2)); }
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
index db643d76e0d..3a125d72a89 100644
--- a/zookeeper-server/zookeeper-server/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
+++ b/zookeeper-server/zookeeper-server/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
@@ -38,6 +38,9 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
+/**
+ * @author jonmv
+ */
public class VespaZooKeeperTest {
static final Path tempDirRoot = getTmpDir();
@@ -45,18 +48,17 @@ public class VespaZooKeeperTest {
/**
* Performs dynamic reconfiguration of ZooKeeper servers.
- *
+ * <p>
* First, a cluster of 3 servers is set up, and some data is written to it.
* Then, 3 new servers are added, and the first 3 marked for retirement;
* this should force the quorum to move the 3 new servers, but not disconnect the old ones.
* Next, the old servers are removed.
* Then, the cluster is reduced to size 1.
* Finally, the cluster grows to size 3 again.
- *
+ * <p>
* Throughout all of this, quorum should remain, and the data should remain the same.
*/
@Test(timeout = 120_000)
- @Ignore // Unstable, some ZK server keeps resetting connections sometimes.
public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException {
List<ZooKeeper> keepers = new ArrayList<>();
for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper());
@@ -126,7 +128,7 @@ public class VespaZooKeeperTest {
static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
try (ZooKeeperAdmin admin = createAdmin(config)) {
List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+ String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.PERSISTENT_SEQUENTIAL);
String read = new String(admin.getData(node, false, new Stat()), UTF_8);
assertEquals("hi", read);
return node;