summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2022-09-28 20:36:32 +0200
committerGitHub <noreply@github.com>2022-09-28 20:36:32 +0200
commitd50f7bd9c99ed9d8edeabb71825f3966f9cd6bd9 (patch)
tree1e62ce64ce4d9f4c78782d2dfc11a8486b7fbfe4
parent04f06262381803317e2bdbf1dcfd20b7789270db (diff)
parent60796a65294ead8553edd149ce8756be778f8baa (diff)
Merge pull request #24250 from vespa-engine/bjorncs/zk-ttl
Add optional TTL to persistent nodes created through Curator
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java21
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java6
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java29
3 files changed, 39 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 859dcc2f93f..f159d471f55 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -1,9 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator;
-import com.yahoo.component.annotation.Inject;
import com.yahoo.cloud.config.CuratorConfig;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.annotation.Inject;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.path.Path;
import com.yahoo.vespa.curator.api.VespaCurator;
@@ -14,6 +14,7 @@ import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
@@ -23,9 +24,11 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.EphemeralType;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import java.io.File;
@@ -38,12 +41,10 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -210,17 +211,27 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
}
}
+ /** @see #create(Path, Duration) */
+ public boolean create(Path path) { return create(path, null); }
+
/**
* Creates an empty node at a path, creating any parents as necessary.
* If the node already exists nothing is done.
* Returns whether a change was attempted.
*/
- public boolean create(Path path) {
+ public boolean create(Path path, Duration ttl) {
if (exists(path)) return false;
String absolutePath = path.getAbsolute();
try {
- framework().create().creatingParentsIfNeeded().forPath(absolutePath, new byte[0]);
+ CreateBuilder b = framework().create();
+ if (ttl != null) {
+ long millis = ttl.toMillis();
+ if (millis <= 0 || millis > EphemeralType.TTL.maxValue())
+ throw new IllegalArgumentException(ttl.toString());
+ b.withTtl(millis).withMode(CreateMode.PERSISTENT_WITH_TTL);
+ }
+ b.creatingParentsIfNeeded().forPath(absolutePath, new byte[0]);
} catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {
// Path created between exists() and create() call, do nothing
} catch (Exception e) {
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java
index 689e81c2d94..3a48a1ec331 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java
@@ -117,6 +117,9 @@ class MemoryFileSystem extends FileSystem {
/** The content of this node, never null. This buffer is effectively immutable. */
private byte[] content;
+ /** Optional TTL. Currently not in use. */
+ private Long ttl;
+
private final AtomicInteger version = new AtomicInteger(0);
private Map<String, Node> children = Collections.synchronizedMap(new LinkedHashMap<>());
@@ -140,6 +143,9 @@ class MemoryFileSystem extends FileSystem {
this.version.incrementAndGet();
}
+ /** Set optional TTL */
+ public void setTtl(long ttl) { this.ttl = ttl; }
+
public int version() { return version.get(); }
/**
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
index 4cb510e7904..81fb24bd7e5 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
@@ -351,7 +351,7 @@ public class MockCuratorFramework implements CuratorFramework {
// ----- Start of adaptor methods from Curator to the mock file system -----
/** Creates a node below the given directory root */
- private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners)
+ private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl)
throws KeeperException.NodeExistsException, KeeperException.NoNodeException {
validatePath(pathString);
Path path = Path.fromString(pathString);
@@ -364,7 +364,11 @@ public class MockCuratorFramework implements CuratorFramework {
if (parent.children().containsKey(path.getName()))
throw new KeeperException.NodeExistsException(path.toString());
- parent.add(name).setContent(content);
+ MemoryFileSystem.Node node = parent.add(name);
+ node.setContent(content);
+ if (List.of(CreateMode.PERSISTENT_WITH_TTL, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL).contains(createMode)
+ && ttl != null)
+ node.setTtl(ttl);
String nodePath = "/" + path.getParentPath().toString() + "/" + name;
listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED);
return nodePath;
@@ -422,7 +426,7 @@ public class MockCuratorFramework implements CuratorFramework {
private String nodeName(String baseName, CreateMode createMode) {
switch (createMode) {
- case PERSISTENT: case EPHEMERAL: return baseName;
+ case PERSISTENT: case EPHEMERAL: case PERSISTENT_WITH_TTL: return baseName;
case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++;
default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator");
}
@@ -837,6 +841,7 @@ public class MockCuratorFramework implements CuratorFramework {
private boolean createParents = false;
private CreateMode createMode = CreateMode.PERSISTENT;
+ private Long ttl;
@Override
public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded() {
@@ -845,12 +850,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
+ return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
}
@Override
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
+ return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
}
};
@@ -862,12 +867,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
+ return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
}
@Override
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
+ return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
}
};
@@ -885,11 +890,11 @@ public class MockCuratorFramework implements CuratorFramework {
}
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
+ return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
}
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
+ return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
}
@Override
@@ -923,7 +928,7 @@ public class MockCuratorFramework implements CuratorFramework {
}
@Override
- public CreateBuilderMain withTtl(long l) { return this; }
+ public CreateBuilderMain withTtl(long l) { this.ttl = l; return this; }
@Override
public CreateBuilder2 orSetData() {
@@ -1283,13 +1288,13 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- createNode(s, bytes, false, createMode, newRoot, delayedListener);
+ createNode(s, bytes, false, createMode, newRoot, delayedListener, null);
return new MockCuratorTransactionBridge();
}
@Override
public CuratorTransactionBridge forPath(String s) throws Exception {
- createNode(s, new byte[0], false, createMode, newRoot, delayedListener);
+ createNode(s, new byte[0], false, createMode, newRoot, delayedListener, null);
return new MockCuratorTransactionBridge();
}