summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2022-10-07 10:45:03 +0200
committerjonmv <venstad@gmail.com>2022-10-10 12:33:57 +0200
commit40ef644020d4f1e1c4092e0903752b342f14adc2 (patch)
treea1018960605e828b9f9a3933150f863e96a3e7a0 /zkfacade
parentd7a8d6e225297b81bc8769d308dd06084f87c19d (diff)
Support more stat and version in Curator
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java32
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java122
2 files changed, 110 insertions, 44 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index f159d471f55..e096eee8a97 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -195,7 +195,11 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
* If the path and any of its parents does not exists they are created.
*/
// TODO: Use create().orSetData() in Curator 4 and later
- public void set(Path path, byte[] data) {
+ public Stat set(Path path, byte[] data) {
+ return set(path, data, -1);
+ }
+
+ public Stat set(Path path, byte[] data, int expectedVersion) {
if (data.length > juteMaxBuffer)
throw new IllegalArgumentException("Cannot not set data at " + path.getAbsolute() + ", " +
data.length + " bytes is too much, max number of bytes allowed per node is " + juteMaxBuffer);
@@ -205,12 +209,13 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
String absolutePath = path.getAbsolute();
try {
- framework().setData().forPath(absolutePath, data);
+ return framework().setData().withVersion(expectedVersion).forPath(absolutePath, data);
} catch (Exception e) {
throw new RuntimeException("Could not set data at " + absolutePath, e);
}
}
+
/** @see #create(Path, Duration) */
public boolean create(Path path) { return create(path, null); }
@@ -220,6 +225,9 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
* Returns whether a change was attempted.
*/
public boolean create(Path path, Duration ttl) {
+ return create(path, ttl, null);
+ }
+ private boolean create(Path path, Duration ttl, Stat stat) {
if (exists(path)) return false;
String absolutePath = path.getAbsolute();
@@ -231,7 +239,8 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
throw new IllegalArgumentException(ttl.toString());
b.withTtl(millis).withMode(CreateMode.PERSISTENT_WITH_TTL);
}
- b.creatingParentsIfNeeded().forPath(absolutePath, new byte[0]);
+ if (stat == null) b.creatingParentsIfNeeded() .forPath(absolutePath, new byte[0]);
+ else b.creatingParentsIfNeeded().storingStatIn(stat).forPath(absolutePath, new byte[0]);
} catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {
// Path created between exists() and create() call, do nothing
} catch (Exception e) {
@@ -258,12 +267,16 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
}
/**
- * Deletes the given path and any children it may have.
- * If the path does not exists nothing is done.
+ * Deletes the path and any children it may have.
+ * If the path does not exist, nothing is done.
*/
public void delete(Path path) {
+ delete(path, -1);
+ }
+
+ public void delete(Path path, int expectedVersion) {
try {
- framework().delete().guaranteed().deletingChildrenIfNeeded().forPath(path.getAbsolute());
+ framework().delete().guaranteed().deletingChildrenIfNeeded().withVersion(expectedVersion).forPath(path.getAbsolute());
} catch (KeeperException.NoNodeException e) {
// Do nothing
} catch (Exception e) {
@@ -290,8 +303,13 @@ public class Curator extends AbstractComponent implements VespaCurator, AutoClos
* Empty is returned if the path does not exist.
*/
public Optional<byte[]> getData(Path path) {
+ return getData(path, null);
+ }
+
+ Optional<byte[]> getData(Path path, Stat stat) {
try {
- return Optional.of(framework().getData().forPath(path.getAbsolute()));
+ return stat == null ? Optional.of(framework().getData() .forPath(path.getAbsolute()))
+ : Optional.of(framework().getData().storingStatIn(stat).forPath(path.getAbsolute()));
}
catch (KeeperException.NoNodeException e) {
return Optional.empty();
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
index 81fb24bd7e5..c8566015ea1 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
@@ -8,6 +8,7 @@ import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.path.Path;
import com.yahoo.vespa.curator.CompletionTimeoutException;
import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
import com.yahoo.vespa.curator.recipes.CuratorLockException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
@@ -83,6 +84,8 @@ import org.apache.curator.retry.RetryForever;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
@@ -124,6 +127,8 @@ public class MockCuratorFramework implements CuratorFramework {
/** Listeners to changes to a particular path */
private final ListenerMap listeners = new ListenerMap();
+ public final MockListenable<ConnectionStateListener> connectionStateListeners = new MockListenable<>();
+
private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT;
private int monotonicallyIncreasingNumber = 0;
@@ -267,7 +272,7 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public Listenable<ConnectionStateListener> getConnectionStateListenable() {
- return new MockListenable<>();
+ return connectionStateListeners;
}
@Override
@@ -351,7 +356,7 @@ public class MockCuratorFramework implements CuratorFramework {
// ----- Start of adaptor methods from Curator to the mock file system -----
/** Creates a node below the given directory root */
- private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl)
+ private String createNode(String pathString, byte[] content, boolean createParents, Stat stat, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners, Long ttl)
throws KeeperException.NodeExistsException, KeeperException.NoNodeException {
validatePath(pathString);
Path path = Path.fromString(pathString);
@@ -371,18 +376,21 @@ public class MockCuratorFramework implements CuratorFramework {
node.setTtl(ttl);
String nodePath = "/" + path.getParentPath().toString() + "/" + name;
listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED);
+ if (stat != null) stat.setVersion(node.version());
return nodePath;
}
/** Deletes a node below the given directory root */
- private void deleteNode(String pathString, boolean deleteChildren, MemoryFileSystem.Node root, Listeners listeners)
- throws KeeperException.NoNodeException, KeeperException.NotEmptyException {
+ private void deleteNode(String pathString, boolean deleteChildren, int version, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException {
validatePath(pathString);
Path path = Path.fromString(pathString);
MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
if (parent == null) throw new KeeperException.NoNodeException(path.toString());
MemoryFileSystem.Node node = parent.children().get(path.getName());
if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent);
+ if (version != -1 && version != node.version())
+ throw new KeeperException.BadVersionException("expected version " + version + ", but was " + node.version());
if ( ! node.children().isEmpty() && ! deleteChildren)
throw new KeeperException.NotEmptyException(path.toString());
parent.remove(path.getName());
@@ -390,16 +398,20 @@ public class MockCuratorFramework implements CuratorFramework {
}
/** Returns the data of a node */
- private byte[] getData(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ private byte[] getData(String pathString, Stat stat, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
validatePath(pathString);
- return getNode(pathString, root).getContent();
+ return getNode(pathString, stat, root).getContent();
}
/** sets the data of an existing node */
- private void setData(String pathString, byte[] content, MemoryFileSystem.Node root, Listeners listeners)
- throws KeeperException.NoNodeException {
+ private void setData(String pathString, byte[] content, int version, Stat stat, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException {
validatePath(pathString);
- getNode(pathString, root).setContent(content);
+ Node node = getNode(pathString, null, root);
+ if (version != -1 && version != node.version())
+ throw new KeeperException.BadVersionException("expected version " + version + ", but was " + node.version());
+ node.setContent(content);
+ if (stat != null) stat.setVersion(node.version());
listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED);
}
@@ -414,13 +426,14 @@ public class MockCuratorFramework implements CuratorFramework {
}
/** Returns a node or throws the appropriate exception if it doesn't exist */
- private MemoryFileSystem.Node getNode(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ private MemoryFileSystem.Node getNode(String pathString, Stat stat, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
validatePath(pathString);
Path path = Path.fromString(pathString);
MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
if (parent == null) throw new KeeperException.NoNodeException(path.toString());
MemoryFileSystem.Node node = parent.children().get(path.getName());
if (node == null) throw new KeeperException.NoNodeException(path.toString());
+ if (stat != null) stat.setVersion(node.version());
return node;
}
@@ -783,6 +796,7 @@ public class MockCuratorFramework implements CuratorFramework {
private abstract static class MockProtectACLCreateModeStatPathAndBytesable<String>
implements ProtectACLCreateModeStatPathAndBytesable<String> {
+ Stat stat = null;
public BackgroundPathAndBytesable<String> withACL(List<ACL> list) {
throw new UnsupportedOperationException("Not implemented in MockCurator");
}
@@ -832,6 +846,7 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat) {
+ this.stat = stat;
return this;
}
@@ -850,12 +865,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, bytes, createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
@Override
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, new byte[0], createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
};
@@ -867,12 +882,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, bytes, createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
@Override
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, new byte[0], createParents, stat, createMode, fileSystem.root(), listeners, ttl);
}
};
@@ -890,11 +905,11 @@ public class MockCuratorFramework implements CuratorFramework {
}
public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, new byte[0], createParents, null, createMode, fileSystem.root(), listeners, ttl);
}
public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners, ttl);
+ return createNode(s, bytes, createParents, null, createMode, fileSystem.root(), listeners, ttl);
}
@Override
@@ -1043,9 +1058,8 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public Stat forPath(String path) throws Exception {
try {
- MemoryFileSystem.Node node = getNode(path, fileSystem.root());
Stat stat = new Stat();
- stat.setVersion(node.version());
+ getNode(path, stat, fileSystem.root());
return stat;
}
catch (KeeperException.NoNodeException e) {
@@ -1067,6 +1081,7 @@ public class MockCuratorFramework implements CuratorFramework {
private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder {
private boolean deleteChildren = false;
+ private int version = -1;
@Override
public BackgroundVersionable deletingChildrenIfNeeded() {
@@ -1081,11 +1096,12 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public BackgroundPathable<Void> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
public Void forPath(String pathString) throws Exception {
- deleteNode(pathString, deleteChildren, fileSystem.root(), listeners);
+ deleteNode(pathString, deleteChildren, version, fileSystem.root(), listeners);
return null;
}
@@ -1107,7 +1123,7 @@ public class MockCuratorFramework implements CuratorFramework {
}
public byte[] forPath(String path) throws Exception {
- return getData(path, fileSystem.root());
+ return getData(path, null, fileSystem.root());
}
@Override
@@ -1142,13 +1158,34 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public WatchPathable<byte[]> storingStatIn(Stat stat) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ return new WatchPathable<byte[]>() {
+ @Override
+ public byte[] forPath(String path) throws Exception {
+ return getData(path, stat, fileSystem.root());
+ }
+
+ @Override
+ public Pathable<byte[]> watched() {
+ return null;
+ }
+
+ @Override
+ public Pathable<byte[]> usingWatcher(Watcher watcher) {
+ return null;
+ }
+
+ @Override
+ public Pathable<byte[]> usingWatcher(CuratorWatcher watcher) {
+ return null;
+ }
+ };
}
}
// extends MockBackgroundACLPathAndBytesableBuilder<Stat>
private class MockSetDataBuilder implements SetDataBuilder {
+ int version = -1;
@Override
public SetDataBackgroundVersionable compressed() {
throw new UnsupportedOperationException("Not implemented in MockCurator");
@@ -1156,18 +1193,22 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public BackgroundPathAndBytesable<Stat> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
@Override
public Stat forPath(String path, byte[] bytes) throws Exception {
- setData(path, bytes, fileSystem.root(), listeners);
- return null;
+ Stat stat = new Stat();
+ setData(path, bytes, version, stat, fileSystem.root(), listeners);
+ return stat;
}
@Override
- public Stat forPath(String s) throws Exception {
- return null;
+ public Stat forPath(String path) throws Exception {
+ Stat stat = new Stat();
+ setData(path, new byte[0], version, stat, fileSystem.root(), listeners);
+ return stat;
}
@Override
@@ -1206,18 +1247,21 @@ public class MockCuratorFramework implements CuratorFramework {
}
/** Allows addition of directoryListeners which are never called */
- private static class MockListenable<T> implements Listenable<T> {
+ public static class MockListenable<T> implements Listenable<T> {
+
+ public final List<T> listeners = new ArrayList<>();
@Override
public void addListener(T t) {
+ listeners.add(t);
}
@Override
- public void addListener(T t, Executor executor) {
- }
+ public void addListener(T t, Executor executor) { throw new UnsupportedOperationException("not supported in mock curator"); }
@Override
public void removeListener(T t) {
+ listeners.remove(t);
}
}
@@ -1288,13 +1332,13 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- createNode(s, bytes, false, createMode, newRoot, delayedListener, null);
+ createNode(s, bytes, false, null, createMode, newRoot, delayedListener, null);
return new MockCuratorTransactionBridge();
}
@Override
public CuratorTransactionBridge forPath(String s) throws Exception {
- createNode(s, new byte[0], false, createMode, newRoot, delayedListener, null);
+ createNode(s, new byte[0], false, null, createMode, newRoot, delayedListener, null);
return new MockCuratorTransactionBridge();
}
@@ -1316,14 +1360,16 @@ public class MockCuratorFramework implements CuratorFramework {
private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder<CuratorTransactionBridge> {
+ int version = -1;
@Override
public Pathable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
@Override
public CuratorTransactionBridge forPath(String path) throws Exception {
- deleteNode(path, false, newRoot, delayedListener);
+ deleteNode(path, false, version, newRoot, delayedListener);
return new MockCuratorTransactionBridge();
}
@@ -1331,6 +1377,7 @@ public class MockCuratorFramework implements CuratorFramework {
private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder<CuratorTransactionBridge> {
+ int version = -1;
@Override
public VersionPathAndBytesable<CuratorTransactionBridge> compressed() {
throw new UnsupportedOperationException("Not implemented in MockCurator");
@@ -1338,18 +1385,19 @@ public class MockCuratorFramework implements CuratorFramework {
@Override
public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
+ version = i;
+ return this;
}
@Override
public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- MockCuratorFramework.this.setData(s, bytes, newRoot, delayedListener);
+ MockCuratorFramework.this.setData(s, bytes, version, null, newRoot, delayedListener);
return new MockCuratorTransactionBridge();
}
@Override
public CuratorTransactionBridge forPath(String s) throws Exception {
- MockCuratorFramework.this.setData(s, new byte[0], newRoot, delayedListener);
+ MockCuratorFramework.this.setData(s, new byte[0], version, null, newRoot, delayedListener);
return new MockCuratorTransactionBridge();
}