summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2020-11-20 11:03:07 +0100
committerMartin Polden <mpolden@mpolden.no>2020-11-20 11:03:07 +0100
commit607dbfedf2fbf8a2bddb5ffb1e23c796eb92dd5f (patch)
tree8e9e2a1b3019b3496a30a4d18cc729c7630391e8 /zkfacade
parenta7f509b84b99c5b18c147ad81f0a5b06f09507e4 (diff)
Refactor mock to simplify Curator constructors
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java31
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java1158
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java1169
3 files changed, 1195 insertions, 1163 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index e281cb01f74..127eeba71e1 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -31,6 +31,7 @@ import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -57,7 +58,7 @@ public class Curator implements AutoCloseable {
private static final int MAX_RETRIES = 10;
private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES);
- protected final RetryPolicy retryPolicy;
+ protected final RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY;
private final CuratorFramework curatorFramework;
private final ConnectionSpec connectionSpec;
@@ -94,36 +95,28 @@ public class Curator implements AutoCloseable {
Optional.of(ZK_CLIENT_CONFIG_FILE));
}
- protected Curator(String connectionSpec,
- String zooKeeperEnsembleConnectionSpec,
- Function<RetryPolicy, CuratorFramework> curatorFactory) {
- this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory, DEFAULT_RETRY_POLICY);
+ protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function<RetryPolicy, CuratorFramework> curatorFactory) {
+ this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory.apply(DEFAULT_RETRY_POLICY));
}
Curator(ConnectionSpec connectionSpec, Optional<File> clientConfigFile) {
this(connectionSpec,
- (retryPolicy) -> CuratorFrameworkFactory
+ CuratorFrameworkFactory
.builder()
- .retryPolicy(retryPolicy)
+ .retryPolicy(DEFAULT_RETRY_POLICY)
.sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis())
.connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis())
.connectString(connectionSpec.local())
.zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile)))
.dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8
- .build(),
- DEFAULT_RETRY_POLICY);
+ .build());
}
- private Curator(ConnectionSpec connectionSpec,
- Function<RetryPolicy, CuratorFramework> curatorFactory,
- RetryPolicy retryPolicy) {
- this.connectionSpec = connectionSpec;
- this.retryPolicy = retryPolicy;
- this.curatorFramework = curatorFactory.apply(retryPolicy);
- if (this.curatorFramework != null) {
- addLoggingListener();
- curatorFramework.start();
- }
+ private Curator(ConnectionSpec connectionSpec, CuratorFramework curatorFramework) {
+ this.connectionSpec = Objects.requireNonNull(connectionSpec);
+ this.curatorFramework = Objects.requireNonNull(curatorFramework);
+ addLoggingListener();
+ curatorFramework.start();
}
private static ZKClientConfig createClientConfig(Optional<File> clientConfigFile) {
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
index 8e3b433354d..26f1c336874 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
@@ -1,94 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator.mock;
-import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.google.inject.Inject;
-import com.yahoo.collections.Pair;
-import com.yahoo.concurrent.Lock;
-import com.yahoo.concurrent.Locks;
import com.yahoo.path.Path;
-import com.yahoo.vespa.curator.CompletionTimeoutException;
import com.yahoo.vespa.curator.Curator;
-import com.yahoo.vespa.curator.recipes.CuratorLockException;
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.ACLCreateModePathAndBytesable;
-import org.apache.curator.framework.api.ACLPathAndBytesable;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.BackgroundPathAndBytesable;
-import org.apache.curator.framework.api.BackgroundPathable;
-import org.apache.curator.framework.api.BackgroundVersionable;
-import org.apache.curator.framework.api.ChildrenDeletable;
-import org.apache.curator.framework.api.CreateBackgroundModeACLable;
-import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.CuratorListener;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.api.DeleteBuilder;
-import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.framework.api.ExistsBuilder;
-import org.apache.curator.framework.api.ExistsBuilderMain;
-import org.apache.curator.framework.api.GetACLBuilder;
-import org.apache.curator.framework.api.GetChildrenBuilder;
-import org.apache.curator.framework.api.GetDataBuilder;
-import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
-import org.apache.curator.framework.api.PathAndBytesable;
-import org.apache.curator.framework.api.Pathable;
-import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
-import org.apache.curator.framework.api.SetACLBuilder;
-import org.apache.curator.framework.api.SetDataBackgroundVersionable;
-import org.apache.curator.framework.api.SetDataBuilder;
-import org.apache.curator.framework.api.SyncBuilder;
-import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.api.VersionPathAndBytesable;
-import org.apache.curator.framework.api.WatchPathable;
-import org.apache.curator.framework.api.Watchable;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
-import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
-import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
-import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
-import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
-import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.recipes.atomic.AtomicStats;
-import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
/**
* <p>A <b>non thread safe</b> mock of the curator API.
@@ -105,24 +25,7 @@ import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
*/
public class MockCurator extends Curator {
- public boolean timeoutOnLock = false;
- public boolean throwExceptionOnLock = false;
- private boolean shouldTimeoutOnEnter = false;
- private int monotonicallyIncreasingNumber = 0;
- private final boolean stableOrdering;
private String zooKeeperEnsembleConnectionSpec = "";
- private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS);
-
- /** The file system used by this mock to store zookeeper files and directories */
- private final MemoryFileSystem fileSystem = new MemoryFileSystem();
-
- /** Atomic counters. A more accurate mock would store these as files in the file system */
- private final Map<String, MockAtomicCounter> atomicCounters = new ConcurrentHashMap<>();
-
- /** Listeners to changes to a particular path */
- private final ListenerMap listeners = new ListenerMap();
-
- private final CuratorFramework curatorFramework;
/** Creates a mock curator with stable ordering */
@Inject
@@ -137,26 +40,24 @@ public class MockCurator extends Curator {
* This is not what ZooKeeper does.
*/
public MockCurator(boolean stableOrdering) {
- super("host1:10001", "host1:10001", (retryPolicy) -> null);
- this.stableOrdering = stableOrdering;
- curatorFramework = new MockCuratorFramework();
- curatorFramework.start();
+ super("host1:2181", "host1:2181", (retryPolicy) -> new MockCuratorFramework(stableOrdering, false));
+ }
+
+ private MockCuratorFramework mockFramework() {
+ return (MockCuratorFramework) super.framework();
}
/**
* Lists the entire content of this curator instance as a multiline string.
* Useful for debugging.
*/
- public String dumpState() { return fileSystem.dumpState(); }
-
- /** Returns a started curator framework */
- public CuratorFramework framework() { return curatorFramework; }
+ public String dumpState() { return mockFramework().fileSystem().dumpState(); }
/** Returns an atomic counter in this, or empty if no such counter is created */
public Optional<DistributedAtomicLong> counter(String path) {
- return Optional.ofNullable(atomicCounters.get(path));
+ return Optional.ofNullable(mockFramework().atomicCounters().get(path));
}
-
+
/**
* Sets the ZooKeeper ensemble connection spec, which must be on the form
* host1:port,host2:port ...
@@ -170,1068 +71,37 @@ public class MockCurator extends Curator {
return zooKeeperEnsembleConnectionSpec;
}
- // ----- Start of adaptor methods from Curator to the mock file system -----
-
- /** Creates a node below the given directory root */
- private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, Node root, Listeners listeners)
- throws KeeperException.NodeExistsException, KeeperException.NoNodeException {
- validatePath(pathString);
- Path path = Path.fromString(pathString);
- if (path.isRoot()) return "/"; // the root already exists
- Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents);
- String name = nodeName(path.getName(), createMode);
-
- if (parent == null)
- throw new KeeperException.NoNodeException(path.getParentPath().toString());
- if (parent.children().containsKey(path.getName()))
- throw new KeeperException.NodeExistsException(path.toString());
-
- parent.add(name).setContent(content);
- String nodePath = "/" + path.getParentPath().toString() + "/" + name;
- listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED);
- return nodePath;
- }
-
- /** Deletes a node below the given directory root */
- private void deleteNode(String pathString, boolean deleteChildren, Node root, Listeners listeners)
- throws KeeperException.NoNodeException, KeeperException.NotEmptyException {
- validatePath(pathString);
- Path path = Path.fromString(pathString);
- Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
- if (parent == null) throw new KeeperException.NoNodeException(path.toString());
- Node node = parent.children().get(path.getName());
- if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent);
- if ( ! node.children().isEmpty() && ! deleteChildren)
- throw new KeeperException.NotEmptyException(path.toString());
- parent.remove(path.getName());
- listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED);
- }
-
- /** Returns the data of a node */
- private byte[] getData(String pathString, Node root) throws KeeperException.NoNodeException {
- validatePath(pathString);
- return getNode(pathString, root).getContent();
- }
-
- /** sets the data of an existing node */
- private void setData(String pathString, byte[] content, Node root, Listeners listeners)
- throws KeeperException.NoNodeException {
- validatePath(pathString);
- getNode(pathString, root).setContent(content);
- listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED);
- }
-
- private List<String> getChildren(String path, Node root) throws KeeperException.NoNodeException {
- validatePath(path);
- Node node = root.getNode(Paths.get(path), false);
- if (node == null) throw new KeeperException.NoNodeException(path);
- List<String> children = new ArrayList<>(node.children().keySet());
- if (! stableOrdering)
- Collections.shuffle(children);
- return children;
- }
-
- private boolean exists(String path, Node root) {
- validatePath(path);
- Node parent = root.getNode(Paths.get(Path.fromString(path).getParentPath().toString()), false);
- if (parent == null) return false;
- Node node = parent.children().get(Path.fromString(path).getName());
- return node != null;
- }
-
- /** Returns a node or throws the appropriate exception if it doesn't exist */
- private Node getNode(String pathString, Node root) throws KeeperException.NoNodeException {
- validatePath(pathString);
- Path path = Path.fromString(pathString);
- Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
- if (parent == null) throw new KeeperException.NoNodeException(path.toString());
- Node node = parent.children().get(path.getName());
- if (node == null) throw new KeeperException.NoNodeException(path.toString());
- return node;
- }
-
- private String nodeName(String baseName, CreateMode createMode) {
- switch (createMode) {
- case PERSISTENT: case EPHEMERAL: return baseName;
- case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++;
- default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator");
- }
- }
-
- /** Validates a path using the same rules as ZooKeeper */
- public static String validatePath(String path) throws IllegalArgumentException {
- if (path == null) throw new IllegalArgumentException("Path cannot be null");
- if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0");
- if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character");
- if (path.length() == 1) return path; // done checking - it's the root
- if (path.charAt(path.length() - 1) == '/')
- throw new IllegalArgumentException("Path must not end with / character");
-
- String reason = null;
- char lastc = '/';
- char chars[] = path.toCharArray();
- char c;
- for (int i = 1; i < chars.length; lastc = chars[i], i++) {
- c = chars[i];
-
- if (c == 0) {
- reason = "null character not allowed @" + i;
- break;
- } else if (c == '/' && lastc == '/') {
- reason = "empty node name specified @" + i;
- break;
- } else if (c == '.' && lastc == '.') {
- if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
- reason = "relative paths not allowed @" + i;
- break;
- }
- } else if (c == '.') {
- if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
- reason = "relative paths not allowed @" + i;
- break;
- }
- } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F'
- || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') {
- reason = "invalid charater @" + i;
- break;
- }
- }
-
- if (reason != null)
- throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason);
- return path;
- }
-
- // ----- Mock of Curator recipes accessed through our Curator interface -----
-
@Override
public DistributedAtomicLong createAtomicCounter(String path) {
- MockAtomicCounter counter = atomicCounters.get(path);
- if (counter == null) {
- counter = new MockAtomicCounter(path);
- atomicCounters.put(path, counter);
- }
- return counter;
+ return mockFramework().createAtomicCounter(path);
}
- /** Create a mutex which ensures exclusive access within this single vm */
@Override
public InterProcessLock createMutex(String path) {
- return new MockLock(path);
- }
-
- public MockCurator timeoutBarrierOnEnter(boolean shouldTimeout) {
- shouldTimeoutOnEnter = shouldTimeout;
- return this;
+ return mockFramework().createMutex(path);
}
@Override
public CompletionWaiter getCompletionWaiter(Path parentPath, int numMembers, String id) {
- return new MockCompletionWaiter();
+ return mockFramework().createCompletionWaiter();
}
@Override
public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, int numMembers, String id) {
- return new MockCompletionWaiter();
+ return mockFramework().createCompletionWaiter();
}
@Override
public DirectoryCache createDirectoryCache(String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) {
- return new MockDirectoryCache(Path.fromString(path));
+ return mockFramework().createDirectoryCache(path);
}
@Override
public FileCache createFileCache(String path, boolean dataIsCompressed) {
- return new MockFileCache(Path.fromString(path));
+ return mockFramework().createFileCache(path);
}
@Override
public int zooKeeperEnsembleCount() { return 1; }
- /**
- * Invocation of changes to the file system state is abstracted through this to allow transactional
- * changes to notify on commit
- */
- private abstract class Listeners {
-
- /** Translating method */
- public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) {
- String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/
- PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data));
- notify(path, event);
- }
-
- public abstract void notify(Path path, PathChildrenCacheEvent event);
-
- }
-
- /** The regular listener implementation which notifies registered file and directory listeners */
- private class ListenerMap extends Listeners {
-
- private final Map<Path, PathChildrenCacheListener> directoryListeners = new ConcurrentHashMap<>();
- private final Map<Path, NodeCacheListener> fileListeners = new ConcurrentHashMap<>();
-
- public void add(Path path, PathChildrenCacheListener listener) {
- directoryListeners.put(path, listener);
- }
-
- public void add(Path path, NodeCacheListener listener) {
- fileListeners.put(path, listener);
- }
-
- @Override
- public void notify(Path path, PathChildrenCacheEvent event) {
- try {
- // Snapshot directoryListeners in case notification leads to new directoryListeners added
- Set<Map.Entry<Path, PathChildrenCacheListener>> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet());
- for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryListenerSnapshot) {
- if (path.isChildOf(listener.getKey()))
- listener.getValue().childEvent(curatorFramework, event);
- }
-
- // Snapshot directoryListeners in case notification leads to new directoryListeners added
- Set<Map.Entry<Path, NodeCacheListener>> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet());
- for (Map.Entry<Path, NodeCacheListener> listener : fileListenerSnapshot) {
- if (path.equals(listener.getKey()))
- listener.getValue().nodeChanged();
- }
- }
- catch (Exception e) {
- e.printStackTrace(); // TODO: Remove
- throw new RuntimeException("Exception notifying listeners", e);
- }
- }
-
- }
-
- private class MockCompletionWaiter implements CompletionWaiter {
-
- @Override
- public void awaitCompletion(Duration timeout) {
- if (shouldTimeoutOnEnter) {
- throw new CompletionTimeoutException("");
- }
- }
-
- @Override
- public void notifyCompletion() {
- }
-
- }
-
- /** A lock which works inside a single vm */
- private class MockLock extends InterProcessSemaphoreMutex {
-
- private final String path;
-
- private Lock lock = null;
-
- public MockLock(String path) {
- super(curatorFramework, path);
- this.path = path;
- }
-
- @Override
- public boolean acquire(long timeout, TimeUnit unit) {
- if (throwExceptionOnLock)
- throw new CuratorLockException("Thrown by mock");
- if (timeoutOnLock) return false;
-
- try {
- lock = locks.lock(path, timeout, unit);
- return true;
- }
- catch (UncheckedTimeoutException e) {
- return false;
- }
- }
-
- @Override
- public void acquire() {
- if (throwExceptionOnLock)
- throw new CuratorLockException("Thrown by mock");
-
- lock = locks.lock(path);
- }
-
- @Override
- public void release() {
- if (lock != null)
- lock.close();
- }
-
- }
-
- private class MockAtomicCounter extends DistributedAtomicLong {
-
- private boolean initialized = false;
- private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/
-
- public MockAtomicCounter(String path) {
- super(curatorFramework, path, retryPolicy);
- }
-
- @Override
- public boolean initialize(Long value) {
- if (initialized) return false;
- this.value = new MockLongValue(value);
- initialized = true;
- return true;
- }
-
- @Override
- public AtomicValue<Long> get() {
- if (value == null) return new MockLongValue(0);
- return value;
- }
-
- public AtomicValue<Long> add(Long delta) throws Exception {
- return trySet(value.postValue() + delta);
- }
-
- public AtomicValue<Long> subtract(Long delta) throws Exception {
- return trySet(value.postValue() - delta);
- }
-
- @Override
- public AtomicValue<Long> increment() {
- return trySet(value.postValue() + 1);
- }
-
- public AtomicValue<Long> decrement() throws Exception {
- return trySet(value.postValue() - 1);
- }
-
- @Override
- public AtomicValue<Long> trySet(Long longval) {
- value = new MockLongValue(longval);
- return value;
- }
-
- public void forceSet(Long newValue) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- }
-
- private class MockLongValue implements AtomicValue<Long> {
-
- private AtomicLong value = new AtomicLong();
-
- public MockLongValue(long value) {
- this.value.set(value);
- }
-
- @Override
- public boolean succeeded() {
- return true;
- }
-
- public void setValue(long value) {
- this.value.set(value);
- }
-
- @Override
- public Long preValue() {
- return value.get();
- }
-
- @Override
- public Long postValue() {
- return value.get();
- }
-
- @Override
- public AtomicStats getStats() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- }
-
- private class MockDirectoryCache implements DirectoryCache {
-
- /** The path this is caching and listening to */
- private Path path;
-
- public MockDirectoryCache(Path path) {
- this.path = path;
- }
-
- @Override
- public void start() {}
-
- @Override
- public void addListener(PathChildrenCacheListener listener) {
- listeners.add(path, listener);
- }
-
- @Override
- public List<ChildData> getCurrentData() {
- List<ChildData> childData = new ArrayList<>();
- for (String childName : getChildren(path)) {
- Path childPath = path.append(childName);
- childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get()));
- }
- return childData;
- }
-
- @Override
- public ChildData getCurrentData(Path fullPath) {
- if (!fullPath.getParentPath().equals(path)) {
- throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'");
- }
-
- return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null);
- }
-
- private void collectData(Node parent, Path parentPath, List<ChildData> data) {
- for (Node child : parent.children().values()) {
- Path childPath = parentPath.append(child.name());
- data.add(new ChildData("/" + childPath.toString(), null, child.getContent()));
- }
- }
-
- @Override
- public void close() {}
-
- }
-
- private class MockFileCache implements FileCache {
-
- /** The path this is caching and listening to */
- private Path path;
-
- public MockFileCache(Path path) {
- this.path = path;
- }
-
- @Override
- public void start() {}
-
- @Override
- public void addListener(NodeCacheListener listener) {
- listeners.add(path, listener);
- }
-
- @Override
- public ChildData getCurrentData() {
- Node node = fileSystem.root().getNode(Paths.get(path.toString()), false);
- if (node == null) return null;
- return new ChildData("/" + path.toString(), null, node.getContent());
- }
-
- @Override
- public void close() {}
-
- }
-
- // ----- The rest of this file is adapting the Curator (non-recipe) API to the -----
- // ----- file system methods above. -----
- // ----- There's nothing to see unless you are interested in an illustration of -----
- // ----- the folly of fluent API's or, more generally, mankind. -----
-
- private abstract class MockBackgroundACLPathAndBytesableBuilder<T> implements PathAndBytesable<T>, ProtectACLCreateModePathAndBytesable<T> {
-
- public BackgroundPathAndBytesable<T> withACL(List<ACL> list) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public ACLBackgroundPathAndBytesable<T> withMode(CreateMode createMode) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ACLCreateModeBackgroundPathAndBytesable<String> withProtection() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public T forPath(String s, byte[] bytes) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public T forPath(String s) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- }
-
- private class MockCreateBuilder extends MockBackgroundACLPathAndBytesableBuilder<String> implements CreateBuilder {
-
- private boolean createParents = false;
- private CreateMode createMode = CreateMode.PERSISTENT;
-
- @Override
- public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded() {
- createParents = true;
- return this;
- }
-
- @Override
- public ACLCreateModeBackgroundPathAndBytesable<String> withProtection() {
- // Protection against the server crashing after creating the file but before returning to the client.
- // Not relevant for an in-memory mock, obviously
- return this;
- }
-
- public ACLBackgroundPathAndBytesable<String> withMode(CreateMode createMode) {
- this.createMode = createMode;
- return this;
- }
-
- @Override
- public CreateBackgroundModeACLable compressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ProtectACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded() {
- // TODO: Add proper support for container nodes, see https://issues.apache.org/jira/browse/ZOOKEEPER-2163.
- return creatingParentsIfNeeded();
- }
-
- @Override
- @Deprecated
- public ACLPathAndBytesable<String> withProtectedEphemeralSequential() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
- }
-
- public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- private class MockBackgroundPathableBuilder<T> implements BackgroundPathable<T>, Watchable<BackgroundPathable<T>> {
-
- @Override
- public ErrorListenerPathable<T> inBackground() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public T forPath(String s) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathable<T> watched() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathable<T> usingWatcher(Watcher watcher) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathable<T> usingWatcher(CuratorWatcher curatorWatcher) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder<List<String>> implements GetChildrenBuilder {
-
- @Override
- public WatchPathable<List<String>> storingStatIn(Stat stat) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public List<String> forPath(String path) throws Exception {
- return getChildren(path, fileSystem.root());
- }
-
- }
-
- private class MockExistsBuilder extends MockBackgroundPathableBuilder<Stat> implements ExistsBuilder {
-
- @Override
- public Stat forPath(String path) throws Exception {
- try {
- Node node = getNode(path, fileSystem.root());
- Stat stat = new Stat();
- stat.setVersion(node.version());
- return stat;
- }
- catch (KeeperException.NoNodeException e) {
- return null;
- }
- }
-
- @Override
- public ExistsBuilderMain creatingParentContainersIfNeeded() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder {
-
- private boolean deleteChildren = false;
-
- @Override
- public BackgroundVersionable deletingChildrenIfNeeded() {
- deleteChildren = true;
- return this;
- }
-
- @Override
- public ChildrenDeletable guaranteed() {
- return this;
- }
-
- @Override
- public BackgroundPathable<Void> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public Void forPath(String pathString) throws Exception {
- deleteNode(pathString, deleteChildren, fileSystem.root(), listeners);
- return null;
- }
-
- }
-
- private class MockGetDataBuilder extends MockBackgroundPathableBuilder<byte[]> implements GetDataBuilder {
-
- @Override
- public GetDataWatchBackgroundStatable decompressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public WatchPathable<byte[]> storingStatIn(Stat stat) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public byte[] forPath(String path) throws Exception {
- return getData(path, fileSystem.root());
- }
-
- }
-
- private class MockSetDataBuilder extends MockBackgroundACLPathAndBytesableBuilder<Stat> implements SetDataBuilder {
-
- @Override
- public SetDataBackgroundVersionable compressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathAndBytesable<Stat> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public Stat forPath(String path, byte[] bytes) throws Exception {
- setData(path, bytes, fileSystem.root(), listeners);
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- /** Allows addition of directoryListeners which are never called */
- private class MockListenable<T> implements Listenable<T> {
-
- @Override
- public void addListener(T t) {
- }
-
- @Override
- public void addListener(T t, Executor executor) {
- }
-
- @Override
- public void removeListener(T t) {
- }
-
- }
-
- private class MockCuratorTransactionFinal implements CuratorTransactionFinal {
-
- /** The new directory root in which the transactional changes are made */
- private Node newRoot;
-
- private boolean committed = false;
-
- private final DelayedListener delayedListener = new DelayedListener();
-
- public MockCuratorTransactionFinal() {
- newRoot = fileSystem.root().clone();
- }
-
- @Override
- public Collection<CuratorTransactionResult> commit() throws Exception {
- fileSystem.replaceRoot(newRoot);
- committed = true;
- delayedListener.commit();
- return null; // TODO
- }
-
- @Override
- public TransactionCreateBuilder create() {
- ensureNotCommitted();
- return new MockTransactionCreateBuilder();
- }
-
- @Override
- public TransactionDeleteBuilder delete() {
- ensureNotCommitted();
- return new MockTransactionDeleteBuilder();
- }
-
- @Override
- public TransactionSetDataBuilder setData() {
- ensureNotCommitted();
- return new MockTransactionSetDataBuilder();
- }
-
- @Override
- public TransactionCheckBuilder check() {
- ensureNotCommitted();
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- private void ensureNotCommitted() {
- if (committed) throw new IllegalStateException("transaction already committed");
- }
-
- private class MockTransactionCreateBuilder implements TransactionCreateBuilder {
-
- private CreateMode createMode = CreateMode.PERSISTENT;
-
- @Override
- public PathAndBytesable<CuratorTransactionBridge> withACL(List<ACL> list) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ACLCreateModePathAndBytesable<CuratorTransactionBridge> compressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ACLPathAndBytesable<CuratorTransactionBridge> withMode(CreateMode createMode) {
- this.createMode = createMode;
- return this;
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- createNode(s, bytes, false, createMode, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s) throws Exception {
- createNode(s, new byte[0], false, createMode, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- }
-
- private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder {
-
- @Override
- public Pathable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorTransactionBridge forPath(String path) throws Exception {
- deleteNode(path, false, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- }
-
- private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder {
-
- @Override
- public VersionPathAndBytesable<CuratorTransactionBridge> compressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- MockCurator.this.setData(s, bytes, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s) throws Exception {
- MockCurator.this.setData(s, new byte[0], newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- }
-
- private class MockCuratorTransactionBridge implements CuratorTransactionBridge {
-
- @Override
- public CuratorTransactionFinal and() {
- return MockCuratorTransactionFinal.this;
- }
-
- }
-
- /** A class which collects listen events and forwards them to the regular directoryListeners on commit */
- private class DelayedListener extends Listeners {
-
- private final List<Pair<Path, PathChildrenCacheEvent>> events = new ArrayList<>();
-
- @Override
- public void notify(Path path, PathChildrenCacheEvent event) {
- events.add(new Pair<>(path, event));
- }
-
- public void commit() {
- for (Pair<Path, PathChildrenCacheEvent> event : events)
- listeners.notify(event.getFirst(), event.getSecond());
- }
-
- }
-
- }
-
- private class MockCuratorFramework implements CuratorFramework {
-
- private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT;
-
- @Override
- public void start() {
- curatorState = CuratorFrameworkState.STARTED;
- }
-
- @Override
- public void close() {
- curatorState = CuratorFrameworkState.STOPPED;
- }
-
- @Override
- public CuratorFrameworkState getState() {
- return curatorState;
- }
-
- @Override
- @Deprecated
- public boolean isStarted() {
- return curatorState == CuratorFrameworkState.STARTED;
- }
-
- @Override
- public CreateBuilder create() {
- return new MockCreateBuilder();
- }
-
- @Override
- public DeleteBuilder delete() {
- return new MockDeleteBuilder();
- }
-
- @Override
- public ExistsBuilder checkExists() {
- return new MockExistsBuilder();
- }
-
- @Override
- public GetDataBuilder getData() {
- return new MockGetDataBuilder();
- }
-
- @Override
- public SetDataBuilder setData() {
- return new MockSetDataBuilder();
- }
-
- @Override
- public GetChildrenBuilder getChildren() {
- return new MockGetChildrenBuilder();
- }
-
- @Override
- public GetACLBuilder getACL() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public SetACLBuilder setACL() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorTransaction inTransaction() {
- return new MockCuratorTransactionFinal();
- }
-
- @Override
- @Deprecated
- public void sync(String path, Object backgroundContextObject) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public void createContainers(String s) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public Listenable<ConnectionStateListener> getConnectionStateListenable() {
- return new MockListenable<>();
- }
-
- @Override
- public Listenable<CuratorListener> getCuratorListenable() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- @Deprecated
- public CuratorFramework nonNamespaceView() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorFramework usingNamespace(String newNamespace) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public String getNamespace() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorZookeeperClient getZookeeperClient() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Deprecated
- @Override
- public EnsurePath newNamespaceAwareEnsurePath(String path) {
- return new EnsurePath(path);
- }
-
- @Override
- public void clearWatcherReferences(Watcher watcher) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public boolean blockUntilConnected(int i, TimeUnit timeUnit) throws InterruptedException {
- return true;
- }
-
- @Override
- public void blockUntilConnected() throws InterruptedException {
-
- }
-
- @Override
- public SyncBuilder sync() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- }
-
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
new file mode 100644
index 00000000000..9a845e56bfd
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
@@ -0,0 +1,1169 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator.mock;
+
+import com.google.common.util.concurrent.UncheckedTimeoutException;
+import com.yahoo.collections.Pair;
+import com.yahoo.concurrent.Lock;
+import com.yahoo.concurrent.Locks;
+import com.yahoo.path.Path;
+import com.yahoo.vespa.curator.CompletionTimeoutException;
+import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.recipes.CuratorLockException;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.ACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.ACLPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundPathable;
+import org.apache.curator.framework.api.BackgroundVersionable;
+import org.apache.curator.framework.api.ChildrenDeletable;
+import org.apache.curator.framework.api.CreateBackgroundModeACLable;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.ExistsBuilderMain;
+import org.apache.curator.framework.api.GetACLBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
+import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.SetACLBuilder;
+import org.apache.curator.framework.api.SetDataBackgroundVersionable;
+import org.apache.curator.framework.api.SetDataBuilder;
+import org.apache.curator.framework.api.SyncBuilder;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.api.VersionPathAndBytesable;
+import org.apache.curator.framework.api.WatchPathable;
+import org.apache.curator.framework.api.Watchable;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
+import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
+import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
+import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.recipes.atomic.AtomicStats;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryForever;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A mock implementation of{@link CuratorFramework} for testing purposes.
+ *
+ * @author mpolden
+ */
+public class MockCuratorFramework implements CuratorFramework {
+
+ private final boolean shouldTimeoutOnEnter;
+ private final boolean stableOrdering;
+ private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS);
+
+ /** The file system used by this mock to store zookeeper files and directories */
+ private final MemoryFileSystem fileSystem = new MemoryFileSystem();
+
+ /** Atomic counters. A more accurate mock would store these as files in the file system */
+ private final Map<String, MockAtomicCounter> atomicCounters = new ConcurrentHashMap<>();
+
+ /** Listeners to changes to a particular path */
+ private final ListenerMap listeners = new ListenerMap();
+
+ private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT;
+ private int monotonicallyIncreasingNumber = 0;
+
+ public MockCuratorFramework(boolean stableOrdering, boolean shouldTimeoutOnEnter) {
+ this.stableOrdering = stableOrdering;
+ this.shouldTimeoutOnEnter = shouldTimeoutOnEnter;
+ }
+
+ public Map<String, MockAtomicCounter> atomicCounters() {
+ return Collections.unmodifiableMap(atomicCounters);
+ }
+
+ public MemoryFileSystem fileSystem() {
+ return fileSystem;
+ }
+
+ @Override
+ public void start() {
+ curatorState = CuratorFrameworkState.STARTED;
+ }
+
+ @Override
+ public void close() {
+ curatorState = CuratorFrameworkState.STOPPED;
+ }
+
+ @Override
+ public CuratorFrameworkState getState() {
+ return curatorState;
+ }
+
+ @Override
+ @Deprecated
+ public boolean isStarted() {
+ return curatorState == CuratorFrameworkState.STARTED;
+ }
+
+ @Override
+ public CreateBuilder create() {
+ return new MockCreateBuilder();
+ }
+
+ @Override
+ public DeleteBuilder delete() {
+ return new MockDeleteBuilder();
+ }
+
+ @Override
+ public ExistsBuilder checkExists() {
+ return new MockExistsBuilder();
+ }
+
+ @Override
+ public GetDataBuilder getData() {
+ return new MockGetDataBuilder();
+ }
+
+ @Override
+ public SetDataBuilder setData() {
+ return new MockSetDataBuilder();
+ }
+
+ @Override
+ public GetChildrenBuilder getChildren() {
+ return new MockGetChildrenBuilder();
+ }
+
+ @Override
+ public GetACLBuilder getACL() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public SetACLBuilder setACL() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorTransaction inTransaction() {
+ return new MockCuratorTransactionFinal();
+ }
+
+ @Override
+ @Deprecated
+ public void sync(String path, Object backgroundContextObject) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public void createContainers(String s) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public Listenable<ConnectionStateListener> getConnectionStateListenable() {
+ return new MockListenable<>();
+ }
+
+ @Override
+ public Listenable<CuratorListener> getCuratorListenable() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ @Deprecated
+ public CuratorFramework nonNamespaceView() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorFramework usingNamespace(String newNamespace) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public String getNamespace() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorZookeeperClient getZookeeperClient() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Deprecated
+ @Override
+ public EnsurePath newNamespaceAwareEnsurePath(String path) {
+ return new EnsurePath(path);
+ }
+
+ @Override
+ public void clearWatcherReferences(Watcher watcher) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public boolean blockUntilConnected(int i, TimeUnit timeUnit) {
+ return true;
+ }
+
+ @Override
+ public void blockUntilConnected() {
+ }
+
+ @Override
+ public SyncBuilder sync() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ // ----- Factory methods for mocks */
+
+ public InterProcessLock createMutex(String path) {
+ return new MockCuratorFramework.MockLock(path);
+ }
+
+ public MockAtomicCounter createAtomicCounter(String path) {
+ return atomicCounters.computeIfAbsent(path, (k) -> new MockAtomicCounter(path));
+ }
+
+ public Curator.CompletionWaiter createCompletionWaiter() {
+ return new MockCuratorFramework.MockCompletionWaiter();
+ }
+
+ public Curator.DirectoryCache createDirectoryCache(String path) {
+ return new MockDirectoryCache(Path.fromString(path));
+ }
+
+ public Curator.FileCache createFileCache(String path) {
+ return new MockFileCache(Path.fromString(path));
+ }
+
+ // ----- Start of adaptor methods from Curator to the mock file system -----
+
+ /** Creates a node below the given directory root */
+ private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException.NodeExistsException, KeeperException.NoNodeException {
+ validatePath(pathString);
+ Path path = Path.fromString(pathString);
+ if (path.isRoot()) return "/"; // the root already exists
+ MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents);
+ String name = nodeName(path.getName(), createMode);
+
+ if (parent == null)
+ throw new KeeperException.NoNodeException(path.getParentPath().toString());
+ if (parent.children().containsKey(path.getName()))
+ throw new KeeperException.NodeExistsException(path.toString());
+
+ parent.add(name).setContent(content);
+ String nodePath = "/" + path.getParentPath().toString() + "/" + name;
+ listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED);
+ return nodePath;
+ }
+
+ /** Deletes a node below the given directory root */
+ private void deleteNode(String pathString, boolean deleteChildren, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException.NoNodeException, KeeperException.NotEmptyException {
+ validatePath(pathString);
+ Path path = Path.fromString(pathString);
+ MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
+ if (parent == null) throw new KeeperException.NoNodeException(path.toString());
+ MemoryFileSystem.Node node = parent.children().get(path.getName());
+ if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent);
+ if ( ! node.children().isEmpty() && ! deleteChildren)
+ throw new KeeperException.NotEmptyException(path.toString());
+ parent.remove(path.getName());
+ listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
+
+ /** Returns the data of a node */
+ private byte[] getData(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ validatePath(pathString);
+ return getNode(pathString, root).getContent();
+ }
+
+ /** sets the data of an existing node */
+ private void setData(String pathString, byte[] content, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException.NoNodeException {
+ validatePath(pathString);
+ getNode(pathString, root).setContent(content);
+ listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ }
+
+ private List<String> getChildren(String path, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ validatePath(path);
+ MemoryFileSystem.Node node = root.getNode(Paths.get(path), false);
+ if (node == null) throw new KeeperException.NoNodeException(path);
+ List<String> children = new ArrayList<>(node.children().keySet());
+ if (! stableOrdering)
+ Collections.shuffle(children);
+ return children;
+ }
+
+ /** Returns a node or throws the appropriate exception if it doesn't exist */
+ private MemoryFileSystem.Node getNode(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ validatePath(pathString);
+ Path path = Path.fromString(pathString);
+ MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
+ if (parent == null) throw new KeeperException.NoNodeException(path.toString());
+ MemoryFileSystem.Node node = parent.children().get(path.getName());
+ if (node == null) throw new KeeperException.NoNodeException(path.toString());
+ return node;
+ }
+
+ private String nodeName(String baseName, CreateMode createMode) {
+ switch (createMode) {
+ case PERSISTENT: case EPHEMERAL: return baseName;
+ case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++;
+ default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator");
+ }
+ }
+
+ /** Validates a path using the same rules as ZooKeeper */
+ public static String validatePath(String path) throws IllegalArgumentException {
+ if (path == null) throw new IllegalArgumentException("Path cannot be null");
+ if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0");
+ if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character");
+ if (path.length() == 1) return path; // done checking - it's the root
+ if (path.charAt(path.length() - 1) == '/')
+ throw new IllegalArgumentException("Path must not end with / character");
+
+ String reason = null;
+ char lastc = '/';
+ char[] chars = path.toCharArray();
+ char c;
+ for (int i = 1; i < chars.length; lastc = chars[i], i++) {
+ c = chars[i];
+
+ if (c == 0) {
+ reason = "null character not allowed @" + i;
+ break;
+ } else if (c == '/' && lastc == '/') {
+ reason = "empty node name specified @" + i;
+ break;
+ } else if (c == '.' && lastc == '.') {
+ if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
+ reason = "relative paths not allowed @" + i;
+ break;
+ }
+ } else if (c == '.') {
+ if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
+ reason = "relative paths not allowed @" + i;
+ break;
+ }
+ } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F'
+ || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') {
+ reason = "invalid charater @" + i;
+ break;
+ }
+ }
+
+ if (reason != null)
+ throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason);
+ return path;
+ }
+
+ /**
+ * Invocation of changes to the file system state is abstracted through this to allow transactional
+ * changes to notify on commit
+ */
+ private abstract static class Listeners {
+
+ /** Translating method */
+ public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) {
+ String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/
+ PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data));
+ notify(path, event);
+ }
+
+ public abstract void notify(Path path, PathChildrenCacheEvent event);
+
+ }
+
+ /** The regular listener implementation which notifies registered file and directory listeners */
+ private class ListenerMap extends Listeners {
+
+ private final Map<Path, PathChildrenCacheListener> directoryListeners = new ConcurrentHashMap<>();
+ private final Map<Path, NodeCacheListener> fileListeners = new ConcurrentHashMap<>();
+
+ public void add(Path path, PathChildrenCacheListener listener) {
+ directoryListeners.put(path, listener);
+ }
+
+ public void add(Path path, NodeCacheListener listener) {
+ fileListeners.put(path, listener);
+ }
+
+ @Override
+ public void notify(Path path, PathChildrenCacheEvent event) {
+ try {
+ // Snapshot directoryListeners in case notification leads to new directoryListeners added
+ Set<Map.Entry<Path, PathChildrenCacheListener>> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet());
+ for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryListenerSnapshot) {
+ if (path.isChildOf(listener.getKey()))
+ listener.getValue().childEvent(MockCuratorFramework.this, event);
+ }
+
+ // Snapshot directoryListeners in case notification leads to new directoryListeners added
+ Set<Map.Entry<Path, NodeCacheListener>> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet());
+ for (Map.Entry<Path, NodeCacheListener> listener : fileListenerSnapshot) {
+ if (path.equals(listener.getKey()))
+ listener.getValue().nodeChanged();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace(); // TODO: Remove
+ throw new RuntimeException("Exception notifying listeners", e);
+ }
+ }
+
+ }
+
+ private class MockCompletionWaiter implements Curator.CompletionWaiter {
+
+ @Override
+ public void awaitCompletion(Duration timeout) {
+ if (shouldTimeoutOnEnter) {
+ throw new CompletionTimeoutException("");
+ }
+ }
+
+ @Override
+ public void notifyCompletion() {
+ }
+
+ }
+
+ /** A lock which works inside a single vm */
+ private class MockLock extends InterProcessSemaphoreMutex {
+
+ public boolean timeoutOnLock = false;
+ public boolean throwExceptionOnLock = false;
+
+ private final String path;
+
+ private Lock lock = null;
+
+ public MockLock(String path) {
+ super(MockCuratorFramework.this, path);
+ this.path = path;
+ }
+
+ @Override
+ public boolean acquire(long timeout, TimeUnit unit) {
+ if (throwExceptionOnLock)
+ throw new CuratorLockException("Thrown by mock");
+ if (timeoutOnLock) return false;
+
+ try {
+ lock = locks.lock(path, timeout, unit);
+ return true;
+ }
+ catch (UncheckedTimeoutException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void acquire() {
+ if (throwExceptionOnLock)
+ throw new CuratorLockException("Thrown by mock");
+
+ lock = locks.lock(path);
+ }
+
+ @Override
+ public void release() {
+ if (lock != null)
+ lock.close();
+ }
+
+ }
+
+ private class MockAtomicCounter extends DistributedAtomicLong {
+
+ private boolean initialized = false;
+ private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/
+
+ public MockAtomicCounter(String path) {
+ super(MockCuratorFramework.this, path, new RetryForever(1_000));
+ }
+
+ @Override
+ public boolean initialize(Long value) {
+ if (initialized) return false;
+ this.value = new MockLongValue(value);
+ initialized = true;
+ return true;
+ }
+
+ @Override
+ public AtomicValue<Long> get() {
+ if (value == null) return new MockLongValue(0);
+ return value;
+ }
+
+ public AtomicValue<Long> add(Long delta) {
+ return trySet(value.postValue() + delta);
+ }
+
+ public AtomicValue<Long> subtract(Long delta) {
+ return trySet(value.postValue() - delta);
+ }
+
+ @Override
+ public AtomicValue<Long> increment() {
+ return trySet(value.postValue() + 1);
+ }
+
+ public AtomicValue<Long> decrement() {
+ return trySet(value.postValue() - 1);
+ }
+
+ @Override
+ public AtomicValue<Long> trySet(Long longval) {
+ value = new MockLongValue(longval);
+ return value;
+ }
+
+ public void forceSet(Long newValue) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ }
+
+ private static class MockLongValue implements AtomicValue<Long> {
+
+ private final AtomicLong value = new AtomicLong();
+
+ public MockLongValue(long value) {
+ this.value.set(value);
+ }
+
+ @Override
+ public boolean succeeded() {
+ return true;
+ }
+
+ public void setValue(long value) {
+ this.value.set(value);
+ }
+
+ @Override
+ public Long preValue() {
+ return value.get();
+ }
+
+ @Override
+ public Long postValue() {
+ return value.get();
+ }
+
+ @Override
+ public AtomicStats getStats() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ }
+
+ private class MockDirectoryCache implements Curator.DirectoryCache {
+
+ /** The path this is caching and listening to */
+ private final Path path;
+
+ public MockDirectoryCache(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void addListener(PathChildrenCacheListener listener) {
+ listeners.add(path, listener);
+ }
+
+ @Override
+ public List<ChildData> getCurrentData() {
+ List<ChildData> childData = new ArrayList<>();
+ for (String childName : getChildren(path)) {
+ Path childPath = path.append(childName);
+ childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get()));
+ }
+ return childData;
+ }
+
+ @Override
+ public ChildData getCurrentData(Path fullPath) {
+ if (!fullPath.getParentPath().equals(path)) {
+ throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'");
+ }
+
+ return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null);
+ }
+
+ @Override
+ public void close() {}
+
+ private List<String> getChildren(Path path) {
+ try {
+ return MockCuratorFramework.this.getChildren().forPath(path.getAbsolute());
+ } catch (KeeperException.NoNodeException e) {
+ return List.of();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not get children of " + path.getAbsolute(), e);
+ }
+ }
+
+ private Optional<byte[]> getData(Path path) {
+ try {
+ return Optional.of(MockCuratorFramework.this.getData().forPath(path.getAbsolute()));
+ }
+ catch (KeeperException.NoNodeException e) {
+ return Optional.empty();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not get data at " + path.getAbsolute(), e);
+ }
+ }
+
+ }
+
+ private class MockFileCache implements Curator.FileCache {
+
+ /** The path this is caching and listening to */
+ private final Path path;
+
+ public MockFileCache(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void addListener(NodeCacheListener listener) {
+ listeners.add(path, listener);
+ }
+
+ @Override
+ public ChildData getCurrentData() {
+ MemoryFileSystem.Node node = fileSystem.root().getNode(Paths.get(path.toString()), false);
+ if (node == null) return null;
+ return new ChildData("/" + path.toString(), null, node.getContent());
+ }
+
+ @Override
+ public void close() {}
+
+ }
+
+ // ----- The rest of this file is adapting the Curator (non-recipe) API to the -----
+ // ----- file system methods above. -----
+ // ----- There's nothing to see unless you are interested in an illustration of -----
+ // ----- the folly of fluent API's or, more generally, mankind. -----
+
+ private abstract static class MockBackgroundACLPathAndBytesableBuilder<T> implements PathAndBytesable<T>, ProtectACLCreateModePathAndBytesable<T> {
+
+ public BackgroundPathAndBytesable<T> withACL(List<ACL> list) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public ACLBackgroundPathAndBytesable<T> withMode(CreateMode createMode) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ACLCreateModeBackgroundPathAndBytesable<String> withProtection() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public T forPath(String s, byte[] bytes) throws Exception {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public T forPath(String s) throws Exception {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ }
+
+ private class MockCreateBuilder extends MockBackgroundACLPathAndBytesableBuilder<String> implements CreateBuilder {
+
+ private boolean createParents = false;
+ private CreateMode createMode = CreateMode.PERSISTENT;
+
+ @Override
+ public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded() {
+ createParents = true;
+ return this;
+ }
+
+ @Override
+ public ACLCreateModeBackgroundPathAndBytesable<String> withProtection() {
+ // Protection against the server crashing after creating the file but before returning to the client.
+ // Not relevant for an in-memory mock, obviously
+ return this;
+ }
+
+ public ACLBackgroundPathAndBytesable<String> withMode(CreateMode createMode) {
+ this.createMode = createMode;
+ return this;
+ }
+
+ @Override
+ public CreateBackgroundModeACLable compressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ProtectACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded() {
+ // TODO: Add proper support for container nodes, see https://issues.apache.org/jira/browse/ZOOKEEPER-2163.
+ return creatingParentsIfNeeded();
+ }
+
+ @Override
+ @Deprecated
+ public ACLPathAndBytesable<String> withProtectedEphemeralSequential() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public String forPath(String s) throws Exception {
+ return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ public String forPath(String s, byte[] bytes) throws Exception {
+ return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ private static class MockBackgroundPathableBuilder<T> implements BackgroundPathable<T>, Watchable<BackgroundPathable<T>> {
+
+ @Override
+ public ErrorListenerPathable<T> inBackground() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public T forPath(String s) throws Exception {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathable<T> watched() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathable<T> usingWatcher(Watcher watcher) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathable<T> usingWatcher(CuratorWatcher curatorWatcher) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder<List<String>> implements GetChildrenBuilder {
+
+ @Override
+ public WatchPathable<List<String>> storingStatIn(Stat stat) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public List<String> forPath(String path) throws Exception {
+ return getChildren(path, fileSystem.root());
+ }
+
+ }
+
+ private class MockExistsBuilder extends MockBackgroundPathableBuilder<Stat> implements ExistsBuilder {
+
+ @Override
+ public Stat forPath(String path) {
+ try {
+ MemoryFileSystem.Node node = getNode(path, fileSystem.root());
+ Stat stat = new Stat();
+ stat.setVersion(node.version());
+ return stat;
+ }
+ catch (KeeperException.NoNodeException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public ExistsBuilderMain creatingParentContainersIfNeeded() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder {
+
+ private boolean deleteChildren = false;
+
+ @Override
+ public BackgroundVersionable deletingChildrenIfNeeded() {
+ deleteChildren = true;
+ return this;
+ }
+
+ @Override
+ public ChildrenDeletable guaranteed() {
+ return this;
+ }
+
+ @Override
+ public BackgroundPathable<Void> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public Void forPath(String pathString) throws Exception {
+ deleteNode(pathString, deleteChildren, fileSystem.root(), listeners);
+ return null;
+ }
+
+ }
+
+ private class MockGetDataBuilder extends MockBackgroundPathableBuilder<byte[]> implements GetDataBuilder {
+
+ @Override
+ public GetDataWatchBackgroundStatable decompressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public WatchPathable<byte[]> storingStatIn(Stat stat) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public byte[] forPath(String path) throws Exception {
+ return getData(path, fileSystem.root());
+ }
+
+ }
+
+ private class MockSetDataBuilder extends MockBackgroundACLPathAndBytesableBuilder<Stat> implements SetDataBuilder {
+
+ @Override
+ public SetDataBackgroundVersionable compressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathAndBytesable<Stat> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public Stat forPath(String path, byte[] bytes) throws Exception {
+ setData(path, bytes, fileSystem.root(), listeners);
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ /** Allows addition of directoryListeners which are never called */
+ private static class MockListenable<T> implements Listenable<T> {
+
+ @Override
+ public void addListener(T t) {
+ }
+
+ @Override
+ public void addListener(T t, Executor executor) {
+ }
+
+ @Override
+ public void removeListener(T t) {
+ }
+
+ }
+
+ private class MockCuratorTransactionFinal implements CuratorTransactionFinal {
+
+ /** The new directory root in which the transactional changes are made */
+ private final MemoryFileSystem.Node newRoot;
+
+ private boolean committed = false;
+
+ private final MockCuratorTransactionFinal.DelayedListener delayedListener = new MockCuratorTransactionFinal.DelayedListener();
+
+ public MockCuratorTransactionFinal() {
+ newRoot = fileSystem.root().clone();
+ }
+
+ @Override
+ public Collection<CuratorTransactionResult> commit() {
+ fileSystem.replaceRoot(newRoot);
+ committed = true;
+ delayedListener.commit();
+ return null; // TODO
+ }
+
+ @Override
+ public TransactionCreateBuilder create() {
+ ensureNotCommitted();
+ return new MockCuratorTransactionFinal.MockTransactionCreateBuilder();
+ }
+
+ @Override
+ public TransactionDeleteBuilder delete() {
+ ensureNotCommitted();
+ return new MockCuratorTransactionFinal.MockTransactionDeleteBuilder();
+ }
+
+ @Override
+ public TransactionSetDataBuilder setData() {
+ ensureNotCommitted();
+ return new MockCuratorTransactionFinal.MockTransactionSetDataBuilder();
+ }
+
+ @Override
+ public TransactionCheckBuilder check() {
+ ensureNotCommitted();
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ private void ensureNotCommitted() {
+ if (committed) throw new IllegalStateException("transaction already committed");
+ }
+
+ private class MockTransactionCreateBuilder implements TransactionCreateBuilder {
+
+ private CreateMode createMode = CreateMode.PERSISTENT;
+
+ @Override
+ public PathAndBytesable<CuratorTransactionBridge> withACL(List<ACL> list) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ACLCreateModePathAndBytesable<CuratorTransactionBridge> compressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ACLPathAndBytesable<CuratorTransactionBridge> withMode(CreateMode createMode) {
+ this.createMode = createMode;
+ return this;
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
+ createNode(s, bytes, false, createMode, newRoot, delayedListener);
+ return new MockCuratorTransactionFinal.MockCuratorTransactionBridge();
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s) throws Exception {
+ createNode(s, new byte[0], false, createMode, newRoot, delayedListener);
+ return new MockCuratorTransactionFinal.MockCuratorTransactionBridge();
+ }
+
+ }
+
+ private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder {
+
+ @Override
+ public Pathable<CuratorTransactionBridge> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String path) throws Exception {
+ deleteNode(path, false, newRoot, delayedListener);
+ return new MockCuratorTransactionFinal.MockCuratorTransactionBridge();
+ }
+
+ }
+
+ private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder {
+
+ @Override
+ public VersionPathAndBytesable<CuratorTransactionBridge> compressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
+ MockCuratorFramework.this.setData(s, bytes, newRoot, delayedListener);
+ return new MockCuratorTransactionFinal.MockCuratorTransactionBridge();
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s) throws Exception {
+ MockCuratorFramework.this.setData(s, new byte[0], newRoot, delayedListener);
+ return new MockCuratorTransactionFinal.MockCuratorTransactionBridge();
+ }
+
+ }
+
+ private class MockCuratorTransactionBridge implements CuratorTransactionBridge {
+
+ @Override
+ public CuratorTransactionFinal and() {
+ return MockCuratorTransactionFinal.this;
+ }
+
+ }
+
+ /** A class which collects listen events and forwards them to the regular directoryListeners on commit */
+ private class DelayedListener extends Listeners {
+
+ private final List<Pair<Path, PathChildrenCacheEvent>> events = new ArrayList<>();
+
+ @Override
+ public void notify(Path path, PathChildrenCacheEvent event) {
+ events.add(new Pair<>(path, event));
+ }
+
+ public void commit() {
+ for (Pair<Path, PathChildrenCacheEvent> event : events)
+ listeners.notify(event.getFirst(), event.getSecond());
+ }
+
+ }
+
+ }
+
+}