diff options
author | jonmv <venstad@gmail.com> | 2022-10-07 10:45:03 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2022-10-10 12:33:57 +0200 |
commit | 40ef644020d4f1e1c4092e0903752b342f14adc2 (patch) | |
tree | a1018960605e828b9f9a3933150f863e96a3e7a0 /zkfacade | |
parent | d7a8d6e225297b81bc8769d308dd06084f87c19d (diff) |
Support more stat and version in Curator
Diffstat (limited to 'zkfacade')
-rw-r--r-- | zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java | 32 | ||||
-rw-r--r-- | zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java | 122 |
2 files changed, 110 insertions, 44 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index f159d471f55..e096eee8a97 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -195,7 +195,11 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos * If the path and any of its parents does not exists they are created. */ // TODO: Use create().orSetData() in Curator 4 and later - public void set(Path path, byte[] data) { + public Stat set(Path path, byte[] data) { + return set(path, data, -1); + } + + public Stat set(Path path, byte[] data, int expectedVersion) { if (data.length > juteMaxBuffer) throw new IllegalArgumentException("Cannot not set data at " + path.getAbsolute() + ", " + data.length + " bytes is too much, max number of bytes allowed per node is " + juteMaxBuffer); @@ -205,12 +209,13 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos String absolutePath = path.getAbsolute(); try { - framework().setData().forPath(absolutePath, data); + return framework().setData().withVersion(expectedVersion).forPath(absolutePath, data); } catch (Exception e) { throw new RuntimeException("Could not set data at " + absolutePath, e); } } + /** @see #create(Path, Duration) */ public boolean create(Path path) { return create(path, null); } @@ -220,6 +225,9 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos * Returns whether a change was attempted. */ public boolean create(Path path, Duration ttl) { + return create(path, ttl, null); + } + private boolean create(Path path, Duration ttl, Stat stat) { if (exists(path)) return false; String absolutePath = path.getAbsolute(); @@ -231,7 +239,8 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos throw new IllegalArgumentException(ttl.toString()); b.withTtl(millis).withMode(CreateMode.PERSISTENT_WITH_TTL); } - b.creatingParentsIfNeeded().forPath(absolutePath, new byte[0]); + if (stat == null) b.creatingParentsIfNeeded() .forPath(absolutePath, new byte[0]); + else b.creatingParentsIfNeeded().storingStatIn(stat).forPath(absolutePath, new byte[0]); } catch (org.apache.zookeeper.KeeperException.NodeExistsException e) { // Path created between exists() and create() call, do nothing } catch (Exception e) { @@ -258,12 +267,16 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos } /** - * Deletes the given path and any children it may have. - * If the path does not exists nothing is done. + * Deletes the path and any children it may have. + * If the path does not exist, nothing is done. */ public void delete(Path path) { + delete(path, -1); + } + + public void delete(Path path, int expectedVersion) { try { - framework().delete().guaranteed().deletingChildrenIfNeeded().forPath(path.getAbsolute()); + framework().delete().guaranteed().deletingChildrenIfNeeded().withVersion(expectedVersion).forPath(path.getAbsolute()); } catch (KeeperException.NoNodeException e) { // Do nothing } catch (Exception e) { @@ -290,8 +303,13 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos * Empty is returned if the path does not exist. */ public Optional<byte[]> getData(Path path) { + return getData(path, null); + } + + Optional<byte[]> getData(Path path, Stat stat) { try { - return Optional.of(framework().getData().forPath(path.getAbsolute())); + return stat == null ? Optional.of(framework().getData() .forPath(path.getAbsolute())) + : Optional.of(framework().getData().storingStatIn(stat).forPath(path.getAbsolute())); } catch (KeeperException.NoNodeException e) { return Optional.empty(); diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java index 81fb24bd7e5..c8566015ea1 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java @@ -8,6 +8,7 @@ import com.yahoo.concurrent.UncheckedTimeoutException; import com.yahoo.path.Path; import com.yahoo.vespa.curator.CompletionTimeoutException; import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.mock.MemoryFileSystem.Node; import com.yahoo.vespa.curator.recipes.CuratorLockException; import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.framework.CuratorFramework; @@ -83,6 +84,8 @@ import org.apache.curator.retry.RetryForever; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -124,6 +127,8 @@ public class MockCuratorFramework implements CuratorFramework { /** Listeners to changes to a particular path */ private final ListenerMap listeners = new ListenerMap(); + public final MockListenable<ConnectionStateListener> connectionStateListeners = new MockListenable<>(); + private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT; private int monotonicallyIncreasingNumber = 0; @@ -267,7 +272,7 @@ public class MockCuratorFramework implements CuratorFramework { @Override public Listenable<ConnectionStateListener> getConnectionStateListenable() { - return new MockListenable<>(); + return connectionStateListeners; } @Override @@ -351,7 +356,7 @@ public class MockCuratorFramework implements CuratorFramework { // ----- Start of adaptor methods from Curator to the mock file system ----- /** Creates a node below the given directory root */ - private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl) + private String createNode(String pathString, byte[] content, boolean createParents, Stat stat, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl) throws KeeperException.NodeExistsException, KeeperException.NoNodeException { validatePath(pathString); Path path = Path.fromString(pathString); @@ -371,18 +376,21 @@ public class MockCuratorFramework implements CuratorFramework { node.setTtl(ttl); String nodePath = "/" + path.getParentPath().toString() + "/" + name; listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED); + if (stat != null) stat.setVersion(node.version()); return nodePath; } /** Deletes a node below the given directory root */ - private void deleteNode(String pathString, boolean deleteChildren, MemoryFileSystem.Node root, Listeners listeners) - throws KeeperException.NoNodeException, KeeperException.NotEmptyException { + private void deleteNode(String pathString, boolean deleteChildren, int version, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException { validatePath(pathString); Path path = Path.fromString(pathString); MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); if (parent == null) throw new KeeperException.NoNodeException(path.toString()); MemoryFileSystem.Node node = parent.children().get(path.getName()); if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent); + if (version != -1 && version != node.version()) + throw new KeeperException.BadVersionException("expected version " + version + ", but was " + node.version()); if ( ! node.children().isEmpty() && ! deleteChildren) throw new KeeperException.NotEmptyException(path.toString()); parent.remove(path.getName()); @@ -390,16 +398,20 @@ public class MockCuratorFramework implements CuratorFramework { } /** Returns the data of a node */ - private byte[] getData(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + private byte[] getData(String pathString, Stat stat, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { validatePath(pathString); - return getNode(pathString, root).getContent(); + return getNode(pathString, stat, root).getContent(); } /** sets the data of an existing node */ - private void setData(String pathString, byte[] content, MemoryFileSystem.Node root, Listeners listeners) - throws KeeperException.NoNodeException { + private void setData(String pathString, byte[] content, int version, Stat stat, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException { validatePath(pathString); - getNode(pathString, root).setContent(content); + Node node = getNode(pathString, null, root); + if (version != -1 && version != node.version()) + throw new KeeperException.BadVersionException("expected version " + version + ", but was " + node.version()); + node.setContent(content); + if (stat != null) stat.setVersion(node.version()); listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED); } @@ -414,13 +426,14 @@ public class MockCuratorFramework implements CuratorFramework { } /** Returns a node or throws the appropriate exception if it doesn't exist */ - private MemoryFileSystem.Node getNode(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + private MemoryFileSystem.Node getNode(String pathString, Stat stat, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { validatePath(pathString); Path path = Path.fromString(pathString); MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); if (parent == null) throw new KeeperException.NoNodeException(path.toString()); MemoryFileSystem.Node node = parent.children().get(path.getName()); if (node == null) throw new KeeperException.NoNodeException(path.toString()); + if (stat != null) stat.setVersion(node.version()); return node; } @@ -783,6 +796,7 @@ public class MockCuratorFramework implements CuratorFramework { private abstract static class MockProtectACLCreateModeStatPathAndBytesable<String> implements ProtectACLCreateModeStatPathAndBytesable<String> { + Stat stat = null; public BackgroundPathAndBytesable<String> withACL(List<ACL> list) { throw new UnsupportedOperationException("Not implemented in MockCurator"); } @@ -832,6 +846,7 @@ public class MockCuratorFramework implements CuratorFramework { @Override public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat) { + this.stat = stat; return this; } @@ -850,12 +865,12 @@ public class MockCuratorFramework implements CuratorFramework { @Override public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl); + return createNode(s, bytes, createParents, stat, createMode, fileSystem.root(), listeners, ttl); } @Override public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl); + return createNode(s, new byte[0], createParents, stat, createMode, fileSystem.root(), listeners, ttl); } }; @@ -867,12 +882,12 @@ public class MockCuratorFramework implements CuratorFramework { @Override public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl); + return createNode(s, bytes, createParents, stat, createMode, fileSystem.root(), listeners, ttl); } @Override public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl); + return createNode(s, new byte[0], createParents, stat, createMode, fileSystem.root(), listeners, ttl); } }; @@ -890,11 +905,11 @@ public class MockCuratorFramework implements CuratorFramework { } public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl); + return createNode(s, new byte[0], createParents, null, createMode, fileSystem.root(), listeners, ttl); } public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl); + return createNode(s, bytes, createParents, null, createMode, fileSystem.root(), listeners, ttl); } @Override @@ -1043,9 +1058,8 @@ public class MockCuratorFramework implements CuratorFramework { @Override public Stat forPath(String path) throws Exception { try { - MemoryFileSystem.Node node = getNode(path, fileSystem.root()); Stat stat = new Stat(); - stat.setVersion(node.version()); + getNode(path, stat, fileSystem.root()); return stat; } catch (KeeperException.NoNodeException e) { @@ -1067,6 +1081,7 @@ public class MockCuratorFramework implements CuratorFramework { private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder { private boolean deleteChildren = false; + private int version = -1; @Override public BackgroundVersionable deletingChildrenIfNeeded() { @@ -1081,11 +1096,12 @@ public class MockCuratorFramework implements CuratorFramework { @Override public BackgroundPathable<Void> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); + version = i; + return this; } public Void forPath(String pathString) throws Exception { - deleteNode(pathString, deleteChildren, fileSystem.root(), listeners); + deleteNode(pathString, deleteChildren, version, fileSystem.root(), listeners); return null; } @@ -1107,7 +1123,7 @@ public class MockCuratorFramework implements CuratorFramework { } public byte[] forPath(String path) throws Exception { - return getData(path, fileSystem.root()); + return getData(path, null, fileSystem.root()); } @Override @@ -1142,13 +1158,34 @@ public class MockCuratorFramework implements CuratorFramework { @Override public WatchPathable<byte[]> storingStatIn(Stat stat) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); + return new WatchPathable<byte[]>() { + @Override + public byte[] forPath(String path) throws Exception { + return getData(path, stat, fileSystem.root()); + } + + @Override + public Pathable<byte[]> watched() { + return null; + } + + @Override + public Pathable<byte[]> usingWatcher(Watcher watcher) { + return null; + } + + @Override + public Pathable<byte[]> usingWatcher(CuratorWatcher watcher) { + return null; + } + }; } } // extends MockBackgroundACLPathAndBytesableBuilder<Stat> private class MockSetDataBuilder implements SetDataBuilder { + int version = -1; @Override public SetDataBackgroundVersionable compressed() { throw new UnsupportedOperationException("Not implemented in MockCurator"); @@ -1156,18 +1193,22 @@ public class MockCuratorFramework implements CuratorFramework { @Override public BackgroundPathAndBytesable<Stat> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); + version = i; + return this; } @Override public Stat forPath(String path, byte[] bytes) throws Exception { - setData(path, bytes, fileSystem.root(), listeners); - return null; + Stat stat = new Stat(); + setData(path, bytes, version, stat, fileSystem.root(), listeners); + return stat; } @Override - public Stat forPath(String s) throws Exception { - return null; + public Stat forPath(String path) throws Exception { + Stat stat = new Stat(); + setData(path, new byte[0], version, stat, fileSystem.root(), listeners); + return stat; } @Override @@ -1206,18 +1247,21 @@ public class MockCuratorFramework implements CuratorFramework { } /** Allows addition of directoryListeners which are never called */ - private static class MockListenable<T> implements Listenable<T> { + public static class MockListenable<T> implements Listenable<T> { + + public final List<T> listeners = new ArrayList<>(); @Override public void addListener(T t) { + listeners.add(t); } @Override - public void addListener(T t, Executor executor) { - } + public void addListener(T t, Executor executor) { throw new UnsupportedOperationException("not supported in mock curator"); } @Override public void removeListener(T t) { + listeners.remove(t); } } @@ -1288,13 +1332,13 @@ public class MockCuratorFramework implements CuratorFramework { @Override public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { - createNode(s, bytes, false, createMode, newRoot, delayedListener, null); + createNode(s, bytes, false, null, createMode, newRoot, delayedListener, null); return new MockCuratorTransactionBridge(); } @Override public CuratorTransactionBridge forPath(String s) throws Exception { - createNode(s, new byte[0], false, createMode, newRoot, delayedListener, null); + createNode(s, new byte[0], false, null, createMode, newRoot, delayedListener, null); return new MockCuratorTransactionBridge(); } @@ -1316,14 +1360,16 @@ public class MockCuratorFramework implements CuratorFramework { private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder<CuratorTransactionBridge> { + int version = -1; @Override public Pathable<CuratorTransactionBridge> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); + version = i; + return this; } @Override public CuratorTransactionBridge forPath(String path) throws Exception { - deleteNode(path, false, newRoot, delayedListener); + deleteNode(path, false, version, newRoot, delayedListener); return new MockCuratorTransactionBridge(); } @@ -1331,6 +1377,7 @@ public class MockCuratorFramework implements CuratorFramework { private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder<CuratorTransactionBridge> { + int version = -1; @Override public VersionPathAndBytesable<CuratorTransactionBridge> compressed() { throw new UnsupportedOperationException("Not implemented in MockCurator"); @@ -1338,18 +1385,19 @@ public class MockCuratorFramework implements CuratorFramework { @Override public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); + version = i; + return this; } @Override public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { - MockCuratorFramework.this.setData(s, bytes, newRoot, delayedListener); + MockCuratorFramework.this.setData(s, bytes, version, null, newRoot, delayedListener); return new MockCuratorTransactionBridge(); } @Override public CuratorTransactionBridge forPath(String s) throws Exception { - MockCuratorFramework.this.setData(s, new byte[0], newRoot, delayedListener); + MockCuratorFramework.this.setData(s, new byte[0], version, null, newRoot, delayedListener); return new MockCuratorTransactionBridge(); } |