diff options
author | Harald Musum <musum@verizonmedia.com> | 2022-09-28 20:36:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-28 20:36:32 +0200 |
commit | d50f7bd9c99ed9d8edeabb71825f3966f9cd6bd9 (patch) | |
tree | 1e62ce64ce4d9f4c78782d2dfc11a8486b7fbfe4 | |
parent | 04f06262381803317e2bdbf1dcfd20b7789270db (diff) | |
parent | 60796a65294ead8553edd149ce8756be778f8baa (diff) |
Merge pull request #24250 from vespa-engine/bjorncs/zk-ttl
Add optional TTL to persistent nodes created through Curator
3 files changed, 39 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 859dcc2f93f..f159d471f55 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.curator; -import com.yahoo.component.annotation.Inject; import com.yahoo.cloud.config.CuratorConfig; import com.yahoo.component.AbstractComponent; +import com.yahoo.component.annotation.Inject; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.path.Path; import com.yahoo.vespa.curator.api.VespaCurator; @@ -14,6 +14,7 @@ import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.transaction.CuratorTransaction; import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; @@ -23,9 +24,11 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.EphemeralType; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import java.io.File; @@ -38,12 +41,10 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; @@ -210,17 +211,27 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos } } + /** @see #create(Path, Duration) */ + public boolean create(Path path) { return create(path, null); } + /** * Creates an empty node at a path, creating any parents as necessary. * If the node already exists nothing is done. * Returns whether a change was attempted. */ - public boolean create(Path path) { + public boolean create(Path path, Duration ttl) { if (exists(path)) return false; String absolutePath = path.getAbsolute(); try { - framework().create().creatingParentsIfNeeded().forPath(absolutePath, new byte[0]); + CreateBuilder b = framework().create(); + if (ttl != null) { + long millis = ttl.toMillis(); + if (millis <= 0 || millis > EphemeralType.TTL.maxValue()) + throw new IllegalArgumentException(ttl.toString()); + b.withTtl(millis).withMode(CreateMode.PERSISTENT_WITH_TTL); + } + b.creatingParentsIfNeeded().forPath(absolutePath, new byte[0]); } catch (org.apache.zookeeper.KeeperException.NodeExistsException e) { // Path created between exists() and create() call, do nothing } catch (Exception e) { diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java index 689e81c2d94..3a48a1ec331 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MemoryFileSystem.java @@ -117,6 +117,9 @@ class MemoryFileSystem extends FileSystem { /** The content of this node, never null. This buffer is effectively immutable. */ private byte[] content; + /** Optional TTL. Currently not in use. */ + private Long ttl; + private final AtomicInteger version = new AtomicInteger(0); private Map<String, Node> children = Collections.synchronizedMap(new LinkedHashMap<>()); @@ -140,6 +143,9 @@ class MemoryFileSystem extends FileSystem { this.version.incrementAndGet(); } + /** Set optional TTL */ + public void setTtl(long ttl) { this.ttl = ttl; } + public int version() { return version.get(); } /** diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java index 4cb510e7904..81fb24bd7e5 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java @@ -351,7 +351,7 @@ public class MockCuratorFramework implements CuratorFramework { // ----- Start of adaptor methods from Curator to the mock file system ----- /** Creates a node below the given directory root */ - private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners) + private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl) throws KeeperException.NodeExistsException, KeeperException.NoNodeException { validatePath(pathString); Path path = Path.fromString(pathString); @@ -364,7 +364,11 @@ public class MockCuratorFramework implements CuratorFramework { if (parent.children().containsKey(path.getName())) throw new KeeperException.NodeExistsException(path.toString()); - parent.add(name).setContent(content); + MemoryFileSystem.Node node = parent.add(name); + node.setContent(content); + if (List.of(CreateMode.PERSISTENT_WITH_TTL, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL).contains(createMode) + && ttl != null) + node.setTtl(ttl); String nodePath = "/" + path.getParentPath().toString() + "/" + name; listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED); return nodePath; @@ -422,7 +426,7 @@ public class MockCuratorFramework implements CuratorFramework { private String nodeName(String baseName, CreateMode createMode) { switch (createMode) { - case PERSISTENT: case EPHEMERAL: return baseName; + case PERSISTENT: case EPHEMERAL: case PERSISTENT_WITH_TTL: return baseName; case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++; default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator"); } @@ -837,6 +841,7 @@ public class MockCuratorFramework implements CuratorFramework { private boolean createParents = false; private CreateMode createMode = CreateMode.PERSISTENT; + private Long ttl; @Override public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded() { @@ -845,12 +850,12 @@ public class MockCuratorFramework implements CuratorFramework { @Override public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); + return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl); } @Override public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); + return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl); } }; @@ -862,12 +867,12 @@ public class MockCuratorFramework implements CuratorFramework { @Override public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); + return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl); } @Override public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); + return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl); } }; @@ -885,11 +890,11 @@ public class MockCuratorFramework implements CuratorFramework { } public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); + return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl); } public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); + return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl); } @Override @@ -923,7 +928,7 @@ public class MockCuratorFramework implements CuratorFramework { } @Override - public CreateBuilderMain withTtl(long l) { return this; } + public CreateBuilderMain withTtl(long l) { this.ttl = l; return this; } @Override public CreateBuilder2 orSetData() { @@ -1283,13 +1288,13 @@ public class MockCuratorFramework implements CuratorFramework { @Override public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { - createNode(s, bytes, false, createMode, newRoot, delayedListener); + createNode(s, bytes, false, createMode, newRoot, delayedListener, null); return new MockCuratorTransactionBridge(); } @Override public CuratorTransactionBridge forPath(String s) throws Exception { - createNode(s, new byte[0], false, createMode, newRoot, delayedListener); + createNode(s, new byte[0], false, createMode, newRoot, delayedListener, null); return new MockCuratorTransactionBridge(); } |