diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-11-26 11:43:38 +0100 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-11-26 11:43:38 +0100 |
commit | 3838f731e99e2a24871b3a90eb40d18579a56e40 (patch) | |
tree | 7e07cd3a87782c3f1a025e25127c8cdf5e2039d7 /zkfacade | |
parent | c2895954f3465b6cdda26c84ecb6a9d1fb8e4a0d (diff) | |
parent | 693277054fd2f122ae40aa011e848526fad8a64e (diff) |
Merge branch 'master' into revert-14062-revert-14057-hmusum/upgrade-to-curator-4
Diffstat (limited to 'zkfacade')
11 files changed, 1661 insertions, 1671 deletions
diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json index e026559b283..e0de2622149 100644 --- a/zkfacade/abi-spec.json +++ b/zkfacade/abi-spec.json @@ -60,6 +60,7 @@ "com.yahoo.vespa.curator.Curator": { "superClass": "java.lang.Object", "interfaces": [ + "com.yahoo.vespa.curator.api.VespaCurator", "java.lang.AutoCloseable" ], "attributes": [ @@ -68,6 +69,7 @@ "methods": [ "public static com.yahoo.vespa.curator.Curator create(java.lang.String)", "public static com.yahoo.vespa.curator.Curator create(java.lang.String, java.util.Optional)", + "public void <init>(com.yahoo.cloud.config.CuratorConfig, com.yahoo.vespa.zookeeper.VespaZooKeeperServer)", "public void <init>(com.yahoo.cloud.config.ConfigserverConfig, com.yahoo.vespa.zookeeper.VespaZooKeeperServer)", "protected void <init>(java.lang.String, java.lang.String, java.util.function.Function)", "public java.lang.String connectionSpec()", @@ -89,7 +91,8 @@ "public org.apache.curator.framework.CuratorFramework framework()", "public void close()", "public java.lang.String zooKeeperEnsembleConnectionSpec()", - "public int zooKeeperEnsembleCount()" + "public int zooKeeperEnsembleCount()", + "public bridge synthetic java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)" ], "fields": [ "protected final org.apache.curator.RetryPolicy retryPolicy" @@ -110,5 +113,18 @@ "public void close()" ], "fields": [] + }, + "com.yahoo.vespa.curator.api.VespaCurator": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)" + ], + "fields": [] } }
\ No newline at end of file diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java new file mode 100644 index 00000000000..4409291419a --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java @@ -0,0 +1,102 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator; + +import com.yahoo.net.HostName; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** + * A connection spec for Curator. + * + * @author mpolden + */ +class ConnectionSpec { + + private final String local; + private final String ensemble; + private final int ensembleSize; + + private ConnectionSpec(String local, String ensemble, int ensembleSize) { + this.local = requireNonEmpty(local, "local spec"); + this.ensemble = requireNonEmpty(ensemble, "ensemble spec"); + this.ensembleSize = ensembleSize; + } + + /** Returns the local spec. This may be a subset of the ensemble spec */ + public String local() { + return local; + } + + /** Returns the ensemble spec. This always contains all nodes in the ensemble */ + public String ensemble() { + return ensemble; + } + + /** Returns the number of servers in the ensemble */ + public int ensembleSize() { + return ensembleSize; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConnectionSpec that = (ConnectionSpec) o; + return ensembleSize == that.ensembleSize && + local.equals(that.local) && + ensemble.equals(that.ensemble); + } + + @Override + public int hashCode() { + return Objects.hash(local, ensemble, ensembleSize); + } + + public static ConnectionSpec create(String spec) { + return create(spec, spec); + } + + public static ConnectionSpec create(String localSpec, String ensembleSpec) { + return new ConnectionSpec(localSpec, ensembleSpec, ensembleSpec.split(",").length); + } + + public static <T> ConnectionSpec create(List<T> servers, + Function<T, String> hostnameGetter, + Function<T, Integer> portGetter, + boolean localhostAffinity) { + String localSpec = createSpec(servers, hostnameGetter, portGetter, localhostAffinity); + String ensembleSpec = localhostAffinity ? createSpec(servers, hostnameGetter, portGetter, false) : localSpec; + return new ConnectionSpec(localSpec, ensembleSpec, servers.size()); + } + + private static <T> String createSpec(List<T> servers, + Function<T, String> hostnameGetter, + Function<T, Integer> portGetter, + boolean localhostAffinity) { + String thisServer = HostName.getLocalhost(); + StringBuilder connectionSpec = new StringBuilder(); + for (var server : servers) { + if (localhostAffinity && !thisServer.equals(hostnameGetter.apply(server))) continue; + connectionSpec.append(hostnameGetter.apply(server)); + connectionSpec.append(':'); + connectionSpec.append(portGetter.apply(server)); + connectionSpec.append(','); + } + if (localhostAffinity && connectionSpec.length() == 0) { + throw new IllegalArgumentException("Unable to create connect string to localhost: " + + "There is no localhost server specified in config"); + } + if (connectionSpec.length() > 0) { + connectionSpec.setLength(connectionSpec.length() - 1); // Remove trailing comma + } + return connectionSpec.toString(); + } + + private static String requireNonEmpty(String s, String field) { + if (Objects.requireNonNull(s).isEmpty()) throw new IllegalArgumentException(field + " must be non-empty"); + return s; + } + +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 04bd64219d4..90eec5760fc 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -3,10 +3,11 @@ package com.yahoo.vespa.curator; import com.google.inject.Inject; import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.cloud.config.CuratorConfig; import com.yahoo.io.IOUtils; -import com.yahoo.net.HostName; import com.yahoo.path.Path; import com.yahoo.text.Utf8; +import com.yahoo.vespa.curator.api.VespaCurator; import com.yahoo.vespa.curator.recipes.CuratorCounter; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; @@ -31,6 +32,7 @@ import java.io.File; import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -47,7 +49,7 @@ import java.util.logging.Logger; * @author vegardh * @author bratseth */ -public class Curator implements AutoCloseable { +public class Curator implements VespaCurator, AutoCloseable { private static final Logger LOG = Logger.getLogger(Curator.class.getName()); private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg")); @@ -55,81 +57,67 @@ public class Curator implements AutoCloseable { private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30); private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1); private static final int MAX_RETRIES = 10; + private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES); - protected final RetryPolicy retryPolicy; + protected final RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY; private final CuratorFramework curatorFramework; - private final String connectionSpec; // May be a subset of the servers in the ensemble - private final String zooKeeperEnsembleConnectionSpec; - private final int zooKeeperEnsembleCount; + private final ConnectionSpec connectionSpec; // All lock keys, to allow re-entrancy. This will grow forever, but this should be too slow to be a problem private final ConcurrentHashMap<Path, Lock> locks = new ConcurrentHashMap<>(); /** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */ public static Curator create(String connectionSpec) { - return new Curator(connectionSpec, connectionSpec, Optional.of(ZK_CLIENT_CONFIG_FILE)); + return new Curator(ConnectionSpec.create(connectionSpec), Optional.of(ZK_CLIENT_CONFIG_FILE)); } // For testing only, use Optional.empty for clientConfigFile parameter to create default zookeeper client config public static Curator create(String connectionSpec, Optional<File> clientConfigFile) { - return new Curator(connectionSpec, connectionSpec, clientConfigFile); + return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile); } - // Depend on ZooKeeperServer to make sure it is started first - // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well) @Inject - public Curator(ConfigserverConfig configserverConfig, VespaZooKeeperServer server) { - this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE)); + public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { + // Depends on ZooKeeperServer to make sure it is started first + this(ConnectionSpec.create(curatorConfig.server(), + CuratorConfig.Server::hostname, + CuratorConfig.Server::port, + curatorConfig.zookeeperLocalhostAffinity()), + Optional.of(ZK_CLIENT_CONFIG_FILE)); } - Curator(ConfigserverConfig configserverConfig, Optional<File> clientConfigFile) { - this(createConnectionSpec(configserverConfig), createEnsembleConnectionSpec(configserverConfig), clientConfigFile); + // TODO: This can be removed when this package is no longer public API. + public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { + this(ConnectionSpec.create(configserverConfig.zookeeperserver(), + ConfigserverConfig.Zookeeperserver::hostname, + ConfigserverConfig.Zookeeperserver::port, + configserverConfig.zookeeperLocalhostAffinity()), + Optional.of(ZK_CLIENT_CONFIG_FILE)); } - private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Optional<File> clientConfigFile) { - this(connectionSpec, - zooKeeperEnsembleConnectionSpec, - (retryPolicy) -> CuratorFrameworkFactory - .builder() - .retryPolicy(retryPolicy) - .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) - .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) - .connectString(connectionSpec) - .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) - .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 - .build()); - } - - protected Curator(String connectionSpec, - String zooKeeperEnsembleConnectionSpec, - Function<RetryPolicy, CuratorFramework> curatorFactory) { - this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory, - new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES)); + protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function<RetryPolicy, CuratorFramework> curatorFactory) { + this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory.apply(DEFAULT_RETRY_POLICY)); } - private Curator(String connectionSpec, - String zooKeeperEnsembleConnectionSpec, - Function<RetryPolicy, CuratorFramework> curatorFactory, - RetryPolicy retryPolicy) { - this.connectionSpec = connectionSpec; - this.retryPolicy = retryPolicy; - this.curatorFramework = curatorFactory.apply(retryPolicy); - if (this.curatorFramework != null) { - validateConnectionSpec(connectionSpec); - validateConnectionSpec(zooKeeperEnsembleConnectionSpec); - addLoggingListener(); - curatorFramework.start(); - } - - this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec; - this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length; + Curator(ConnectionSpec connectionSpec, Optional<File> clientConfigFile) { + this(connectionSpec, + CuratorFrameworkFactory + .builder() + .retryPolicy(DEFAULT_RETRY_POLICY) + .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) + .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) + .connectString(connectionSpec.local()) + .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) + .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 + .build()); } - private static String createConnectionSpec(ConfigserverConfig configserverConfig) { - return configserverConfig.zookeeperLocalhostAffinity() - ? createConnectionSpecForLocalhost(configserverConfig) - : createEnsembleConnectionSpec(configserverConfig); + private Curator(ConnectionSpec connectionSpec, CuratorFramework curatorFramework) { + this.connectionSpec = Objects.requireNonNull(connectionSpec); + this.curatorFramework = Objects.requireNonNull(curatorFramework); + addLoggingListener(); + curatorFramework.start(); } private static ZKClientConfig createClientConfig(Optional<File> clientConfigFile) { @@ -148,39 +136,6 @@ public class Curator implements AutoCloseable { } } - private static String createEnsembleConnectionSpec(ConfigserverConfig config) { - StringBuilder connectionSpec = new StringBuilder(); - for (int i = 0; i < config.zookeeperserver().size(); i++) { - if (connectionSpec.length() > 0) { - connectionSpec.append(','); - } - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - connectionSpec.append(server.hostname()); - connectionSpec.append(':'); - connectionSpec.append(server.port()); - } - return connectionSpec.toString(); - } - - static String createConnectionSpecForLocalhost(ConfigserverConfig config) { - String thisServer = HostName.getLocalhost(); - - for (int i = 0; i < config.zookeeperserver().size(); i++) { - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - if (thisServer.equals(server.hostname())) { - return String.format("%s:%d", server.hostname(), server.port()); - } - } - - throw new IllegalArgumentException("Unable to create connect string to localhost: " + - "There is no localhost server specified in config: " + config); - } - - private static void validateConnectionSpec(String connectionSpec) { - if (connectionSpec == null || connectionSpec.isEmpty()) - throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec)); - } - /** * Returns the ZooKeeper "connect string" used by curator: a comma-separated list of * host:port of ZooKeeper endpoints to connect to. This may be a subset of @@ -189,7 +144,7 @@ public class Curator implements AutoCloseable { * * This may be empty but never null */ - public String connectionSpec() { return connectionSpec; } + public String connectionSpec() { return connectionSpec.local(); } /** For internal use; prefer creating a {@link CuratorCounter} */ public DistributedAtomicLong createAtomicCounter(String path) { @@ -243,13 +198,14 @@ public class Curator implements AutoCloseable { * A convenience method which sets some content at a path. * If the path and any of its parents does not exists they are created. */ + // TODO: Use create().orSetData() in Curator 4 and later public void set(Path path, byte[] data) { + if ( ! exists(path)) + create(path); + String absolutePath = path.getAbsolute(); try { - if ( ! exists(path)) - framework().create().creatingParentsIfNeeded().forPath(absolutePath, data); - else - framework().setData().forPath(absolutePath, data); + framework().setData().forPath(absolutePath, data); } catch (Exception e) { throw new RuntimeException("Could not set data at " + absolutePath, e); } @@ -432,7 +388,7 @@ public class Curator implements AutoCloseable { * TODO: Move method out of this class. */ public String zooKeeperEnsembleConnectionSpec() { - return zooKeeperEnsembleConnectionSpec; + return connectionSpec.ensemble(); } /** @@ -440,7 +396,7 @@ public class Curator implements AutoCloseable { * WARNING: This may be different from the number of servers this Curator may connect to. * TODO: Move method out of this class. */ - public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; } + public int zooKeeperEnsembleCount() { return connectionSpec.ensembleSize(); } private static Optional<String> getEnvironmentVariable(String variableName) { return Optional.ofNullable(System.getenv().get(variableName)) diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java index fb8c303db66..0f8c524fa98 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java @@ -57,6 +57,8 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter { List<String> respondents; do { respondents = curator.framework().getChildren().forPath(barrierPath); + log.log(Level.FINE, respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " + + respondents + ", all participants: " + curator.zooKeeperEnsembleConnectionSpec()); // First, check if all config servers responded if (respondents.size() == curator.zooKeeperEnsembleCount()) { diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java new file mode 100644 index 00000000000..0b6fa55719e --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java @@ -0,0 +1,19 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.api; + +import com.yahoo.path.Path; + +import java.time.Duration; + +/** + * A client for a ZooKeeper cluster running inside Vespa. Applications that want to use ZooKeeper can inject this in + * their code. + * + * @author mpolden + */ +public interface VespaCurator { + + /** Create and acquire a re-entrant lock in given path. This blocks until the lock is acquired or timeout elapses. */ + AutoCloseable lock(Path path, Duration timeout); + +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/package-info.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/package-info.java new file mode 100644 index 00000000000..679dd3750cb --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/package-info.java @@ -0,0 +1,10 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * @author mpolden + */ +@PublicApi +@ExportPackage +package com.yahoo.vespa.curator.api; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java index 1f583ada7a1..26f1c336874 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java @@ -1,110 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.curator.mock; -import com.google.common.util.concurrent.UncheckedTimeoutException; import com.google.inject.Inject; -import com.yahoo.collections.Pair; -import com.yahoo.concurrent.Lock; -import com.yahoo.concurrent.Locks; import com.yahoo.path.Path; -import com.yahoo.vespa.curator.CompletionTimeoutException; import com.yahoo.vespa.curator.Curator; -import com.yahoo.vespa.curator.recipes.CuratorLockException; -import org.apache.curator.CuratorZookeeperClient; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.WatcherRemoveCuratorFramework; -import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; -import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable; -import org.apache.curator.framework.api.ACLCreateModePathAndBytesable; -import org.apache.curator.framework.api.ACLCreateModeStatBackgroundPathAndBytesable; -import org.apache.curator.framework.api.ACLPathAndBytesable; -import org.apache.curator.framework.api.ACLableExistBuilderMain; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundPathAndBytesable; -import org.apache.curator.framework.api.BackgroundPathable; -import org.apache.curator.framework.api.BackgroundVersionable; -import org.apache.curator.framework.api.ChildrenDeletable; -import org.apache.curator.framework.api.CreateBackgroundModeStatACLable; -import org.apache.curator.framework.api.CreateBuilder; -import org.apache.curator.framework.api.CreateBuilder2; -import org.apache.curator.framework.api.CreateBuilderMain; -import org.apache.curator.framework.api.CreateProtectACLCreateModePathAndBytesable; -import org.apache.curator.framework.api.CuratorListener; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.DeleteBuilder; -import org.apache.curator.framework.api.DeleteBuilderMain; -import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; -import org.apache.curator.framework.api.ErrorListenerPathable; -import org.apache.curator.framework.api.ExistsBuilder; -import org.apache.curator.framework.api.GetACLBuilder; -import org.apache.curator.framework.api.GetChildrenBuilder; -import org.apache.curator.framework.api.GetConfigBuilder; -import org.apache.curator.framework.api.GetDataBuilder; -import org.apache.curator.framework.api.GetDataWatchBackgroundStatable; -import org.apache.curator.framework.api.PathAndBytesable; -import org.apache.curator.framework.api.Pathable; -import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable; -import org.apache.curator.framework.api.ReconfigBuilder; -import org.apache.curator.framework.api.RemoveWatchesBuilder; -import org.apache.curator.framework.api.SetACLBuilder; -import org.apache.curator.framework.api.SetDataBackgroundVersionable; -import org.apache.curator.framework.api.SetDataBuilder; -import org.apache.curator.framework.api.SyncBuilder; -import org.apache.curator.framework.api.UnhandledErrorListener; -import org.apache.curator.framework.api.VersionPathAndBytesable; -import org.apache.curator.framework.api.WatchPathable; -import org.apache.curator.framework.api.Watchable; -import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; -import org.apache.curator.framework.api.transaction.CuratorTransaction; -import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; -import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; -import org.apache.curator.framework.api.transaction.CuratorTransactionResult; -import org.apache.curator.framework.api.transaction.TransactionCheckBuilder; -import org.apache.curator.framework.api.transaction.TransactionCreateBuilder; -import org.apache.curator.framework.api.transaction.TransactionCreateBuilder2; -import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder; -import org.apache.curator.framework.api.transaction.TransactionOp; -import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.recipes.atomic.AtomicStats; -import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessLock; -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; -import org.apache.curator.framework.schema.SchemaSet; -import org.apache.curator.framework.state.ConnectionStateErrorPolicy; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.EnsurePath; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; -import java.nio.file.Paths; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node; /** * <p>A <b>non thread safe</b> mock of the curator API. @@ -121,24 +25,7 @@ import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node; */ public class MockCurator extends Curator { - public boolean timeoutOnLock = false; - public boolean throwExceptionOnLock = false; - private boolean shouldTimeoutOnEnter = false; - private int monotonicallyIncreasingNumber = 0; - private final boolean stableOrdering; private String zooKeeperEnsembleConnectionSpec = ""; - private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS); - - /** The file system used by this mock to store zookeeper files and directories */ - private final MemoryFileSystem fileSystem = new MemoryFileSystem(); - - /** Atomic counters. A more accurate mock would store these as files in the file system */ - private final Map<String, MockAtomicCounter> atomicCounters = new ConcurrentHashMap<>(); - - /** Listeners to changes to a particular path */ - private final ListenerMap listeners = new ListenerMap(); - - private final CuratorFramework curatorFramework; /** Creates a mock curator with stable ordering */ @Inject @@ -153,26 +40,24 @@ public class MockCurator extends Curator { * This is not what ZooKeeper does. */ public MockCurator(boolean stableOrdering) { - super("", "", (retryPolicy) -> null); - this.stableOrdering = stableOrdering; - curatorFramework = new MockCuratorFramework(); - curatorFramework.start(); + super("host1:2181", "host1:2181", (retryPolicy) -> new MockCuratorFramework(stableOrdering, false)); + } + + private MockCuratorFramework mockFramework() { + return (MockCuratorFramework) super.framework(); } /** * Lists the entire content of this curator instance as a multiline string. * Useful for debugging. */ - public String dumpState() { return fileSystem.dumpState(); } - - /** Returns a started curator framework */ - public CuratorFramework framework() { return curatorFramework; } + public String dumpState() { return mockFramework().fileSystem().dumpState(); } /** Returns an atomic counter in this, or empty if no such counter is created */ public Optional<DistributedAtomicLong> counter(String path) { - return Optional.ofNullable(atomicCounters.get(path)); + return Optional.ofNullable(mockFramework().atomicCounters().get(path)); } - + /** * Sets the ZooKeeper ensemble connection spec, which must be on the form * host1:port,host2:port ... @@ -186,1455 +71,37 @@ public class MockCurator extends Curator { return zooKeeperEnsembleConnectionSpec; } - // ----- Start of adaptor methods from Curator to the mock file system ----- - - /** Creates a node below the given directory root */ - private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, Node root, Listeners listeners) - throws KeeperException.NodeExistsException, KeeperException.NoNodeException { - validatePath(pathString); - Path path = Path.fromString(pathString); - if (path.isRoot()) return "/"; // the root already exists - Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents); - String name = nodeName(path.getName(), createMode); - - if (parent == null) - throw new KeeperException.NoNodeException(path.getParentPath().toString()); - if (parent.children().containsKey(path.getName())) - throw new KeeperException.NodeExistsException(path.toString()); - - parent.add(name).setContent(content); - String nodePath = "/" + path.getParentPath().toString() + "/" + name; - listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED); - return nodePath; - } - - /** Deletes a node below the given directory root */ - private void deleteNode(String pathString, boolean deleteChildren, Node root, Listeners listeners) - throws KeeperException.NoNodeException, KeeperException.NotEmptyException { - validatePath(pathString); - Path path = Path.fromString(pathString); - Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); - if (parent == null) throw new KeeperException.NoNodeException(path.toString()); - Node node = parent.children().get(path.getName()); - if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent); - if ( ! node.children().isEmpty() && ! deleteChildren) - throw new KeeperException.NotEmptyException(path.toString()); - parent.remove(path.getName()); - listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED); - } - - /** Returns the data of a node */ - private byte[] getData(String pathString, Node root) throws KeeperException.NoNodeException { - validatePath(pathString); - return getNode(pathString, root).getContent(); - } - - /** sets the data of an existing node */ - private void setData(String pathString, byte[] content, Node root, Listeners listeners) - throws KeeperException.NoNodeException { - validatePath(pathString); - getNode(pathString, root).setContent(content); - listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED); - } - - private List<String> getChildren(String path, Node root) throws KeeperException.NoNodeException { - validatePath(path); - Node node = root.getNode(Paths.get(path), false); - if (node == null) throw new KeeperException.NoNodeException(path); - List<String> children = new ArrayList<>(node.children().keySet()); - if (! stableOrdering) - Collections.shuffle(children); - return children; - } - - private boolean exists(String path, Node root) { - validatePath(path); - Node parent = root.getNode(Paths.get(Path.fromString(path).getParentPath().toString()), false); - if (parent == null) return false; - Node node = parent.children().get(Path.fromString(path).getName()); - return node != null; - } - - /** Returns a node or throws the appropriate exception if it doesn't exist */ - private Node getNode(String pathString, Node root) throws KeeperException.NoNodeException { - validatePath(pathString); - Path path = Path.fromString(pathString); - Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); - if (parent == null) throw new KeeperException.NoNodeException(path.toString()); - Node node = parent.children().get(path.getName()); - if (node == null) throw new KeeperException.NoNodeException(path.toString()); - return node; - } - - private String nodeName(String baseName, CreateMode createMode) { - switch (createMode) { - case PERSISTENT: case EPHEMERAL: return baseName; - case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++; - default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator"); - } - } - - /** Validates a path using the same rules as ZooKeeper */ - public static String validatePath(String path) throws IllegalArgumentException { - if (path == null) throw new IllegalArgumentException("Path cannot be null"); - if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0"); - if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character"); - if (path.length() == 1) return path; // done checking - it's the root - if (path.charAt(path.length() - 1) == '/') - throw new IllegalArgumentException("Path must not end with / character"); - - String reason = null; - char lastc = '/'; - char chars[] = path.toCharArray(); - char c; - for (int i = 1; i < chars.length; lastc = chars[i], i++) { - c = chars[i]; - - if (c == 0) { - reason = "null character not allowed @" + i; - break; - } else if (c == '/' && lastc == '/') { - reason = "empty node name specified @" + i; - break; - } else if (c == '.' && lastc == '.') { - if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { - reason = "relative paths not allowed @" + i; - break; - } - } else if (c == '.') { - if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { - reason = "relative paths not allowed @" + i; - break; - } - } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F' - || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') { - reason = "invalid charater @" + i; - break; - } - } - - if (reason != null) - throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason); - return path; - } - - // ----- Mock of Curator recipes accessed through our Curator interface ----- - @Override public DistributedAtomicLong createAtomicCounter(String path) { - MockAtomicCounter counter = atomicCounters.get(path); - if (counter == null) { - counter = new MockAtomicCounter(path); - atomicCounters.put(path, counter); - } - return counter; + return mockFramework().createAtomicCounter(path); } - /** Create a mutex which ensures exclusive access within this single vm */ @Override public InterProcessLock createMutex(String path) { - return new MockLock(path); - } - - public MockCurator timeoutBarrierOnEnter(boolean shouldTimeout) { - shouldTimeoutOnEnter = shouldTimeout; - return this; + return mockFramework().createMutex(path); } @Override public CompletionWaiter getCompletionWaiter(Path parentPath, int numMembers, String id) { - return new MockCompletionWaiter(); + return mockFramework().createCompletionWaiter(); } @Override public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, int numMembers, String id) { - return new MockCompletionWaiter(); + return mockFramework().createCompletionWaiter(); } @Override public DirectoryCache createDirectoryCache(String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) { - return new MockDirectoryCache(Path.fromString(path)); + return mockFramework().createDirectoryCache(path); } @Override public FileCache createFileCache(String path, boolean dataIsCompressed) { - return new MockFileCache(Path.fromString(path)); + return mockFramework().createFileCache(path); } @Override public int zooKeeperEnsembleCount() { return 1; } - /** - * Invocation of changes to the file system state is abstracted through this to allow transactional - * changes to notify on commit - */ - private abstract class Listeners { - - /** Translating method */ - public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) { - String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/ - PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data)); - notify(path, event); - } - - public abstract void notify(Path path, PathChildrenCacheEvent event); - - } - - /** The regular listener implementation which notifies registered file and directory listeners */ - private class ListenerMap extends Listeners { - - private final Map<Path, PathChildrenCacheListener> directoryListeners = new ConcurrentHashMap<>(); - private final Map<Path, NodeCacheListener> fileListeners = new ConcurrentHashMap<>(); - - public void add(Path path, PathChildrenCacheListener listener) { - directoryListeners.put(path, listener); - } - - public void add(Path path, NodeCacheListener listener) { - fileListeners.put(path, listener); - } - - @Override - public void notify(Path path, PathChildrenCacheEvent event) { - try { - // Snapshot directoryListeners in case notification leads to new directoryListeners added - Set<Map.Entry<Path, PathChildrenCacheListener>> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet()); - for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryListenerSnapshot) { - if (path.isChildOf(listener.getKey())) - listener.getValue().childEvent(curatorFramework, event); - } - - // Snapshot directoryListeners in case notification leads to new directoryListeners added - Set<Map.Entry<Path, NodeCacheListener>> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet()); - for (Map.Entry<Path, NodeCacheListener> listener : fileListenerSnapshot) { - if (path.equals(listener.getKey())) - listener.getValue().nodeChanged(); - } - } - catch (Exception e) { - e.printStackTrace(); // TODO: Remove - throw new RuntimeException("Exception notifying listeners", e); - } - } - - } - - private class MockCompletionWaiter implements CompletionWaiter { - - @Override - public void awaitCompletion(Duration timeout) { - if (shouldTimeoutOnEnter) { - throw new CompletionTimeoutException(""); - } - } - - @Override - public void notifyCompletion() { - } - - } - - /** A lock which works inside a single vm */ - private class MockLock extends InterProcessSemaphoreMutex { - - private final String path; - - private Lock lock = null; - - public MockLock(String path) { - super(curatorFramework, path); - this.path = path; - } - - @Override - public boolean acquire(long timeout, TimeUnit unit) { - if (throwExceptionOnLock) - throw new CuratorLockException("Thrown by mock"); - if (timeoutOnLock) return false; - - try { - lock = locks.lock(path, timeout, unit); - return true; - } - catch (UncheckedTimeoutException e) { - return false; - } - } - - @Override - public void acquire() { - if (throwExceptionOnLock) - throw new CuratorLockException("Thrown by mock"); - - lock = locks.lock(path); - } - - @Override - public void release() { - if (lock != null) - lock.close(); - } - - } - - private class MockAtomicCounter extends DistributedAtomicLong { - - private boolean initialized = false; - private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/ - - public MockAtomicCounter(String path) { - super(curatorFramework, path, retryPolicy); - } - - @Override - public boolean initialize(Long value) { - if (initialized) return false; - this.value = new MockLongValue(value); - initialized = true; - return true; - } - - @Override - public AtomicValue<Long> get() { - if (value == null) return new MockLongValue(0); - return value; - } - - public AtomicValue<Long> add(Long delta) throws Exception { - return trySet(value.postValue() + delta); - } - - public AtomicValue<Long> subtract(Long delta) throws Exception { - return trySet(value.postValue() - delta); - } - - @Override - public AtomicValue<Long> increment() { - return trySet(value.postValue() + 1); - } - - public AtomicValue<Long> decrement() throws Exception { - return trySet(value.postValue() - 1); - } - - @Override - public AtomicValue<Long> trySet(Long longval) { - value = new MockLongValue(longval); - return value; - } - - public void forceSet(Long newValue) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - } - - private class MockLongValue implements AtomicValue<Long> { - - private AtomicLong value = new AtomicLong(); - - public MockLongValue(long value) { - this.value.set(value); - } - - @Override - public boolean succeeded() { - return true; - } - - public void setValue(long value) { - this.value.set(value); - } - - @Override - public Long preValue() { - return value.get(); - } - - @Override - public Long postValue() { - return value.get(); - } - - @Override - public AtomicStats getStats() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - } - - private class MockDirectoryCache implements DirectoryCache { - - /** The path this is caching and listening to */ - private Path path; - - public MockDirectoryCache(Path path) { - this.path = path; - } - - @Override - public void start() {} - - @Override - public void addListener(PathChildrenCacheListener listener) { - listeners.add(path, listener); - } - - @Override - public List<ChildData> getCurrentData() { - List<ChildData> childData = new ArrayList<>(); - for (String childName : getChildren(path)) { - Path childPath = path.append(childName); - childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get())); - } - return childData; - } - - @Override - public ChildData getCurrentData(Path fullPath) { - if (!fullPath.getParentPath().equals(path)) { - throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'"); - } - - return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null); - } - - private void collectData(Node parent, Path parentPath, List<ChildData> data) { - for (Node child : parent.children().values()) { - Path childPath = parentPath.append(child.name()); - data.add(new ChildData("/" + childPath.toString(), null, child.getContent())); - } - } - - @Override - public void close() {} - - } - - private class MockFileCache implements FileCache { - - /** The path this is caching and listening to */ - private Path path; - - public MockFileCache(Path path) { - this.path = path; - } - - @Override - public void start() {} - - @Override - public void addListener(NodeCacheListener listener) { - listeners.add(path, listener); - } - - @Override - public ChildData getCurrentData() { - Node node = fileSystem.root().getNode(Paths.get(path.toString()), false); - if (node == null) return null; - return new ChildData("/" + path.toString(), null, node.getContent()); - } - - @Override - public void close() {} - - } - - // ----- The rest of this file is adapting the Curator (non-recipe) API to the ----- - // ----- file system methods above. ----- - // ----- There's nothing to see unless you are interested in an illustration of ----- - // ----- the folly of fluent API's or, more generally, mankind. ----- - private abstract static class MockProtectACLCreateModeStatPathAndBytesable<String> - implements ProtectACLCreateModeStatPathAndBytesable<String> { - - public BackgroundPathAndBytesable<String> withACL(List<ACL> list) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public ProtectACLCreateModeStatPathAndBytesable<String> withMode(CreateMode createMode) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ACLCreateModeBackgroundPathAndBytesable<java.lang.String> withProtection() { - return null; - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground() { - return null; - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(Object o) { - return null; - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) { - return null; - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) { - return null; - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) { - return null; - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - return null; - } - - @Override - public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat) { - return null; - } - - } - - private class MockCreateBuilder implements CreateBuilder { - - private boolean createParents = false; - private CreateMode createMode = CreateMode.PERSISTENT; - - @Override - public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded() { - createParents = true; - return new MockProtectACLCreateModeStatPathAndBytesable<>() { - - @Override - public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); - } - - @Override - public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); - } - - }; - } - - @Override - public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentContainersIfNeeded() { - return new MockProtectACLCreateModeStatPathAndBytesable<>() { - - @Override - public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); - } - - @Override - public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); - } - - }; - } - - @Override - @Deprecated - public ACLPathAndBytesable<String> withProtectedEphemeralSequential() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ACLCreateModeStatBackgroundPathAndBytesable<String> withProtection() { - return null; - } - - public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); - } - - public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CreateBuilderMain withTtl(long l) { - return null; - } - - @Override - public CreateBuilder2 orSetData() { - return null; - } - - @Override - public CreateBuilder2 orSetData(int i) { - return null; - } - - @Override - public CreateBackgroundModeStatACLable compressed() { - return null; - } - - @Override - public CreateProtectACLCreateModePathAndBytesable<String> storingStatIn(Stat stat) { - return null; - } - - @Override - public BackgroundPathAndBytesable<String> withACL(List<ACL> list) { - return null; - } - - @Override - public ACLBackgroundPathAndBytesable<String> withMode(CreateMode createMode) { - this.createMode = createMode; - return this; - } - - @Override - public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) { - return null; - } - } - - private static class MockBackgroundPathableBuilder<T> implements BackgroundPathable<T>, Watchable<BackgroundPathable<T>> { - - @Override - public ErrorListenerPathable<T> inBackground() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<T> inBackground(Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public T forPath(String s) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathable<T> watched() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathable<T> usingWatcher(Watcher watcher) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathable<T> usingWatcher(CuratorWatcher curatorWatcher) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder<List<String>> implements GetChildrenBuilder { - - @Override - public WatchPathable<List<String>> storingStatIn(Stat stat) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public List<String> forPath(String path) throws Exception { - return getChildren(path, fileSystem.root()); - } - - } - - private class MockExistsBuilder extends MockBackgroundPathableBuilder<Stat> implements ExistsBuilder { - - @Override - public Stat forPath(String path) throws Exception { - try { - Node node = getNode(path, fileSystem.root()); - Stat stat = new Stat(); - stat.setVersion(node.version()); - return stat; - } - catch (KeeperException.NoNodeException e) { - return null; - } - } - - @Override - public ACLableExistBuilderMain creatingParentsIfNeeded() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ACLableExistBuilderMain creatingParentContainersIfNeeded() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder { - - private boolean deleteChildren = false; - - @Override - public BackgroundVersionable deletingChildrenIfNeeded() { - deleteChildren = true; - return this; - } - - @Override - public ChildrenDeletable guaranteed() { - return this; - } - - @Override - public BackgroundPathable<Void> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public Void forPath(String pathString) throws Exception { - deleteNode(pathString, deleteChildren, fileSystem.root(), listeners); - return null; - } - - @Override - public DeleteBuilderMain quietly() { - return this; - } - } - - private class MockGetDataBuilder extends MockBackgroundPathableBuilder<byte[]> implements GetDataBuilder { - - @Override - public GetDataWatchBackgroundStatable decompressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public byte[] forPath(String path) throws Exception { - return getData(path, fileSystem.root()); - } - - @Override - public ErrorListenerPathable<byte[]> inBackground() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<byte[]> inBackground(Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public WatchPathable<byte[]> storingStatIn(Stat stat) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - // extends MockBackgroundACLPathAndBytesableBuilder<Stat> - private class MockSetDataBuilder implements SetDataBuilder { - - @Override - public SetDataBackgroundVersionable compressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathAndBytesable<Stat> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public Stat forPath(String path, byte[] bytes) throws Exception { - setData(path, bytes, fileSystem.root(), listeners); - return null; - } - - @Override - public Stat forPath(String s) throws Exception { - return null; - } - - @Override - public ErrorListenerPathAndBytesable<Stat> inBackground() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<Stat> inBackground(Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - /** Allows addition of directoryListeners which are never called */ - private class MockListenable<T> implements Listenable<T> { - - @Override - public void addListener(T t) { - } - - @Override - public void addListener(T t, Executor executor) { - } - - @Override - public void removeListener(T t) { - } - - } - - private class MockCuratorTransactionFinal implements CuratorTransactionFinal { - - /** The new directory root in which the transactional changes are made */ - private Node newRoot; - - private boolean committed = false; - - private final DelayedListener delayedListener = new DelayedListener(); - - public MockCuratorTransactionFinal() { - newRoot = fileSystem.root().clone(); - } - - @Override - public Collection<CuratorTransactionResult> commit() throws Exception { - fileSystem.replaceRoot(newRoot); - committed = true; - delayedListener.commit(); - return null; // TODO - } - - @Override - public TransactionCreateBuilder create() { - ensureNotCommitted(); - return new MockTransactionCreateBuilder(); - } - - @Override - public TransactionDeleteBuilder delete() { - ensureNotCommitted(); - return new MockTransactionDeleteBuilder(); - } - - @Override - public TransactionSetDataBuilder setData() { - ensureNotCommitted(); - return new MockTransactionSetDataBuilder(); - } - - @Override - public TransactionCheckBuilder check() { - ensureNotCommitted(); - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - private void ensureNotCommitted() { - if (committed) throw new IllegalStateException("transaction already committed"); - } - - private class MockTransactionCreateBuilder implements TransactionCreateBuilder { - - private CreateMode createMode = CreateMode.PERSISTENT; - - @Override - public ACLCreateModePathAndBytesable<CuratorTransactionBridge> compressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ACLPathAndBytesable<CuratorTransactionBridge> withMode(CreateMode createMode) { - this.createMode = createMode; - return this; - } - - @Override - public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { - createNode(s, bytes, false, createMode, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - @Override - public CuratorTransactionBridge forPath(String s) throws Exception { - createNode(s, new byte[0], false, createMode, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - @Override - public TransactionCreateBuilder2 withTtl(long l) { - return this; - } - - @Override - public Object withACL(List list, boolean b) { - return this; - } - - @Override - public Object withACL(List list) { - return this; - } - } - - private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder { - - @Override - public Pathable<CuratorTransactionBridge> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorTransactionBridge forPath(String path) throws Exception { - deleteNode(path, false, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - } - - private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder { - - @Override - public VersionPathAndBytesable<CuratorTransactionBridge> compressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { - MockCurator.this.setData(s, bytes, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - @Override - public CuratorTransactionBridge forPath(String s) throws Exception { - MockCurator.this.setData(s, new byte[0], newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - } - - private class MockCuratorTransactionBridge implements CuratorTransactionBridge { - - @Override - public CuratorTransactionFinal and() { - return MockCuratorTransactionFinal.this; - } - - } - - /** A class which collects listen events and forwards them to the regular directoryListeners on commit */ - private class DelayedListener extends Listeners { - - private final List<Pair<Path, PathChildrenCacheEvent>> events = new ArrayList<>(); - - @Override - public void notify(Path path, PathChildrenCacheEvent event) { - events.add(new Pair<>(path, event)); - } - - public void commit() { - for (Pair<Path, PathChildrenCacheEvent> event : events) - listeners.notify(event.getFirst(), event.getSecond()); - } - - } - - } - - private class MockCuratorFramework implements CuratorFramework { - - private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT; - - @Override - public void start() { - curatorState = CuratorFrameworkState.STARTED; - } - - @Override - public void close() { - curatorState = CuratorFrameworkState.STOPPED; - } - - @Override - public CuratorFrameworkState getState() { - return curatorState; - } - - @Override - @Deprecated - public boolean isStarted() { - return curatorState == CuratorFrameworkState.STARTED; - } - - @Override - public CreateBuilder create() { - return new MockCreateBuilder(); - } - - @Override - public DeleteBuilder delete() { - return new MockDeleteBuilder(); - } - - @Override - public ExistsBuilder checkExists() { - return new MockExistsBuilder(); - } - - @Override - public GetDataBuilder getData() { - return new MockGetDataBuilder(); - } - - @Override - public SetDataBuilder setData() { - return new MockSetDataBuilder(); - } - - @Override - public GetChildrenBuilder getChildren() { - return new MockGetChildrenBuilder(); - } - - @Override - public GetACLBuilder getACL() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public SetACLBuilder setACL() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ReconfigBuilder reconfig() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public GetConfigBuilder getConfig() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorTransaction inTransaction() { - return new MockCuratorTransactionFinal(); - } - - @Override - public CuratorMultiTransaction transaction() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public TransactionOp transactionOp() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - @Deprecated - public void sync(String path, Object backgroundContextObject) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public void createContainers(String s) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public Listenable<ConnectionStateListener> getConnectionStateListenable() { - return new MockListenable<>(); - } - - @Override - public Listenable<CuratorListener> getCuratorListenable() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - @Deprecated - public CuratorFramework nonNamespaceView() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorFramework usingNamespace(String newNamespace) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public String getNamespace() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorZookeeperClient getZookeeperClient() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Deprecated - @Override - public EnsurePath newNamespaceAwareEnsurePath(String path) { - return new EnsurePath(path); - } - - @Override - public void clearWatcherReferences(Watcher watcher) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public boolean blockUntilConnected(int i, TimeUnit timeUnit) throws InterruptedException { - return true; - } - - @Override - public void blockUntilConnected() throws InterruptedException { - - } - - @Override - public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() { - return new WatcherRemoveCuratorFramework() { - @Override - public void removeWatchers() { - - } - - @Override - public void start() { - - } - - @Override - public void close() { - - } - - @Override - public CuratorFrameworkState getState() { - return null; - } - - @Override - public boolean isStarted() { - return false; - } - - @Override - public CreateBuilder create() { - return null; - } - - @Override - public DeleteBuilder delete() { - return null; - } - - @Override - public ExistsBuilder checkExists() { - return null; - } - - @Override - public GetDataBuilder getData() { - return null; - } - - @Override - public SetDataBuilder setData() { - return null; - } - - @Override - public GetChildrenBuilder getChildren() { - return null; - } - - @Override - public GetACLBuilder getACL() { - return null; - } - - @Override - public SetACLBuilder setACL() { - return null; - } - - @Override - public ReconfigBuilder reconfig() { - return null; - } - - @Override - public GetConfigBuilder getConfig() { - return null; - } - - @Override - public CuratorTransaction inTransaction() { - return null; - } - - @Override - public CuratorMultiTransaction transaction() { - return null; - } - - @Override - public TransactionOp transactionOp() { - return null; - } - - @Override - public void sync(String s, Object o) { - - } - - @Override - public void createContainers(String s) throws Exception { - - } - - @Override - public SyncBuilder sync() { - return null; - } - - @Override - public RemoveWatchesBuilder watches() { - return null; - } - - @Override - public Listenable<ConnectionStateListener> getConnectionStateListenable() { - return null; - } - - @Override - public Listenable<CuratorListener> getCuratorListenable() { - return null; - } - - @Override - public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { - return null; - } - - @Override - public CuratorFramework nonNamespaceView() { - return null; - } - - @Override - public CuratorFramework usingNamespace(String s) { - return null; - } - - @Override - public String getNamespace() { - return null; - } - - @Override - public CuratorZookeeperClient getZookeeperClient() { - return null; - } - - @Override - public EnsurePath newNamespaceAwareEnsurePath(String s) { - return null; - } - - @Override - public void clearWatcherReferences(Watcher watcher) { - - } - - @Override - public boolean blockUntilConnected(int i, TimeUnit timeUnit) throws InterruptedException { - return false; - } - - @Override - public void blockUntilConnected() throws InterruptedException { - - } - - @Override - public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() { - return null; - } - - @Override - public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() { - return null; - } - - @Override - public QuorumVerifier getCurrentConfig() { - return null; - } - - @Override - public SchemaSet getSchemaSet() { - return null; - } - - @Override - public boolean isZk34CompatibilityMode() { - return false; - } - - @Override - public CompletableFuture<Void> runSafe(Runnable runnable) { - return null; - } - }; - - } - - @Override - public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public QuorumVerifier getCurrentConfig() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public SchemaSet getSchemaSet() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public boolean isZk34CompatibilityMode() { - return false; - } - - @Override - public CompletableFuture<Void> runSafe(Runnable runnable) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public SyncBuilder sync() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public RemoveWatchesBuilder watches() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - } - } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java new file mode 100644 index 00000000000..401c6e90cd8 --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java @@ -0,0 +1,1356 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.mock; + +import com.google.common.util.concurrent.UncheckedTimeoutException; +import com.yahoo.collections.Pair; +import com.yahoo.concurrent.Lock; +import com.yahoo.concurrent.Locks; +import com.yahoo.path.Path; +import com.yahoo.vespa.curator.CompletionTimeoutException; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.recipes.CuratorLockException; +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; +import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; +import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable; +import org.apache.curator.framework.api.ACLCreateModePathAndBytesable; +import org.apache.curator.framework.api.ACLCreateModeStatBackgroundPathAndBytesable; +import org.apache.curator.framework.api.ACLPathAndBytesable; +import org.apache.curator.framework.api.ACLableExistBuilderMain; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.BackgroundPathAndBytesable; +import org.apache.curator.framework.api.BackgroundPathable; +import org.apache.curator.framework.api.BackgroundVersionable; +import org.apache.curator.framework.api.ChildrenDeletable; +import org.apache.curator.framework.api.CreateBackgroundModeStatACLable; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.CreateBuilder2; +import org.apache.curator.framework.api.CreateBuilderMain; +import org.apache.curator.framework.api.CreateProtectACLCreateModePathAndBytesable; +import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.DeleteBuilder; +import org.apache.curator.framework.api.DeleteBuilderMain; +import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; +import org.apache.curator.framework.api.ErrorListenerPathable; +import org.apache.curator.framework.api.ExistsBuilder; +import org.apache.curator.framework.api.GetACLBuilder; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.api.GetConfigBuilder; +import org.apache.curator.framework.api.GetDataBuilder; +import org.apache.curator.framework.api.GetDataWatchBackgroundStatable; +import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable; +import org.apache.curator.framework.api.ReconfigBuilder; +import org.apache.curator.framework.api.RemoveWatchesBuilder; +import org.apache.curator.framework.api.SetACLBuilder; +import org.apache.curator.framework.api.SetDataBackgroundVersionable; +import org.apache.curator.framework.api.SetDataBuilder; +import org.apache.curator.framework.api.SyncBuilder; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.api.VersionPathAndBytesable; +import org.apache.curator.framework.api.WatchPathable; +import org.apache.curator.framework.api.Watchable; +import org.apache.curator.framework.api.transaction.CuratorMultiTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.framework.api.transaction.TransactionCheckBuilder; +import org.apache.curator.framework.api.transaction.TransactionCreateBuilder; +import org.apache.curator.framework.api.transaction.TransactionCreateBuilder2; +import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder; +import org.apache.curator.framework.api.transaction.TransactionOp; +import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.recipes.atomic.AtomicStats; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; +import org.apache.curator.framework.schema.SchemaSet; +import org.apache.curator.framework.state.ConnectionStateErrorPolicy; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryForever; +import org.apache.curator.utils.EnsurePath; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; + +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A mock implementation of{@link CuratorFramework} for testing purposes. + * + * @author mpolden + */ +public class MockCuratorFramework implements CuratorFramework { + + private final boolean shouldTimeoutOnEnter; + private final boolean stableOrdering; + private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS); + + /** The file system used by this mock to store zookeeper files and directories */ + private final MemoryFileSystem fileSystem = new MemoryFileSystem(); + + /** Atomic counters. A more accurate mock would store these as files in the file system */ + private final Map<String, MockAtomicCounter> atomicCounters = new ConcurrentHashMap<>(); + + /** Listeners to changes to a particular path */ + private final ListenerMap listeners = new ListenerMap(); + + private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT; + private int monotonicallyIncreasingNumber = 0; + + public MockCuratorFramework(boolean stableOrdering, boolean shouldTimeoutOnEnter) { + this.stableOrdering = stableOrdering; + this.shouldTimeoutOnEnter = shouldTimeoutOnEnter; + } + + public Map<String, MockAtomicCounter> atomicCounters() { + return Collections.unmodifiableMap(atomicCounters); + } + + public MemoryFileSystem fileSystem() { + return fileSystem; + } + + @Override + public void start() { + curatorState = CuratorFrameworkState.STARTED; + } + + @Override + public void close() { + curatorState = CuratorFrameworkState.STOPPED; + } + + @Override + public CuratorFrameworkState getState() { + return curatorState; + } + + @Override + @Deprecated + public boolean isStarted() { + return curatorState == CuratorFrameworkState.STARTED; + } + + @Override + public CreateBuilder create() { + return new MockCreateBuilder(); + } + + @Override + public DeleteBuilder delete() { + return new MockDeleteBuilder(); + } + + @Override + public ExistsBuilder checkExists() { + return new MockExistsBuilder(); + } + + @Override + public GetDataBuilder getData() { + return new MockGetDataBuilder(); + } + + @Override + public SetDataBuilder setData() { + return new MockSetDataBuilder(); + } + + @Override + public GetChildrenBuilder getChildren() { + return new MockGetChildrenBuilder(); + } + + @Override + public GetACLBuilder getACL() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public SetACLBuilder setACL() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + + @Override + public ReconfigBuilder reconfig() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public GetConfigBuilder getConfig() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public CuratorTransaction inTransaction() { + return new MockCuratorTransactionFinal(); + } + + @Override + public CuratorMultiTransaction transaction() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public TransactionOp transactionOp() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public RemoveWatchesBuilder watches() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public QuorumVerifier getCurrentConfig() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public SchemaSet getSchemaSet() { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + public boolean isZk34CompatibilityMode() { return false; } + + @Override + public CompletableFuture<Void> runSafe(Runnable runnable) { throw new UnsupportedOperationException("Not implemented in MockCurator"); } + + @Override + @Deprecated + public void sync(String path, Object backgroundContextObject) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public void createContainers(String s) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public Listenable<ConnectionStateListener> getConnectionStateListenable() { + return new MockListenable<>(); + } + + @Override + public Listenable<CuratorListener> getCuratorListenable() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + @Deprecated + public CuratorFramework nonNamespaceView() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorFramework usingNamespace(String newNamespace) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public String getNamespace() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorZookeeperClient getZookeeperClient() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Deprecated + @Override + public EnsurePath newNamespaceAwareEnsurePath(String path) { + return new EnsurePath(path); + } + + @Override + public void clearWatcherReferences(Watcher watcher) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public boolean blockUntilConnected(int i, TimeUnit timeUnit) { + return true; + } + + @Override + public void blockUntilConnected() { + } + + @Override + public SyncBuilder sync() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + // ----- Factory methods for mocks */ + + public InterProcessLock createMutex(String path) { + return new MockLock(path); + } + + public MockAtomicCounter createAtomicCounter(String path) { + return atomicCounters.computeIfAbsent(path, (k) -> new MockAtomicCounter(path)); + } + + public Curator.CompletionWaiter createCompletionWaiter() { + return new MockCompletionWaiter(); + } + + public Curator.DirectoryCache createDirectoryCache(String path) { + return new MockDirectoryCache(Path.fromString(path)); + } + + public Curator.FileCache createFileCache(String path) { + return new MockFileCache(Path.fromString(path)); + } + + // ----- Start of adaptor methods from Curator to the mock file system ----- + + /** Creates a node below the given directory root */ + private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException.NodeExistsException, KeeperException.NoNodeException { + validatePath(pathString); + Path path = Path.fromString(pathString); + if (path.isRoot()) return "/"; // the root already exists + MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents); + String name = nodeName(path.getName(), createMode); + + if (parent == null) + throw new KeeperException.NoNodeException(path.getParentPath().toString()); + if (parent.children().containsKey(path.getName())) + throw new KeeperException.NodeExistsException(path.toString()); + + parent.add(name).setContent(content); + String nodePath = "/" + path.getParentPath().toString() + "/" + name; + listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED); + return nodePath; + } + + /** Deletes a node below the given directory root */ + private void deleteNode(String pathString, boolean deleteChildren, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException.NoNodeException, KeeperException.NotEmptyException { + validatePath(pathString); + Path path = Path.fromString(pathString); + MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); + if (parent == null) throw new KeeperException.NoNodeException(path.toString()); + MemoryFileSystem.Node node = parent.children().get(path.getName()); + if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent); + if ( ! node.children().isEmpty() && ! deleteChildren) + throw new KeeperException.NotEmptyException(path.toString()); + parent.remove(path.getName()); + listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED); + } + + /** Returns the data of a node */ + private byte[] getData(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + validatePath(pathString); + return getNode(pathString, root).getContent(); + } + + /** sets the data of an existing node */ + private void setData(String pathString, byte[] content, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException.NoNodeException { + validatePath(pathString); + getNode(pathString, root).setContent(content); + listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED); + } + + private List<String> getChildren(String path, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + validatePath(path); + MemoryFileSystem.Node node = root.getNode(Paths.get(path), false); + if (node == null) throw new KeeperException.NoNodeException(path); + List<String> children = new ArrayList<>(node.children().keySet()); + if (! stableOrdering) + Collections.shuffle(children); + return children; + } + + /** Returns a node or throws the appropriate exception if it doesn't exist */ + private MemoryFileSystem.Node getNode(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + validatePath(pathString); + Path path = Path.fromString(pathString); + MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); + if (parent == null) throw new KeeperException.NoNodeException(path.toString()); + MemoryFileSystem.Node node = parent.children().get(path.getName()); + if (node == null) throw new KeeperException.NoNodeException(path.toString()); + return node; + } + + private String nodeName(String baseName, CreateMode createMode) { + switch (createMode) { + case PERSISTENT: case EPHEMERAL: return baseName; + case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++; + default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator"); + } + } + + /** Validates a path using the same rules as ZooKeeper */ + public static String validatePath(String path) throws IllegalArgumentException { + if (path == null) throw new IllegalArgumentException("Path cannot be null"); + if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0"); + if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character"); + if (path.length() == 1) return path; // done checking - it's the root + if (path.charAt(path.length() - 1) == '/') + throw new IllegalArgumentException("Path must not end with / character"); + + String reason = null; + char lastc = '/'; + char[] chars = path.toCharArray(); + char c; + for (int i = 1; i < chars.length; lastc = chars[i], i++) { + c = chars[i]; + + if (c == 0) { + reason = "null character not allowed @" + i; + break; + } else if (c == '/' && lastc == '/') { + reason = "empty node name specified @" + i; + break; + } else if (c == '.' && lastc == '.') { + if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { + reason = "relative paths not allowed @" + i; + break; + } + } else if (c == '.') { + if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { + reason = "relative paths not allowed @" + i; + break; + } + } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F' + || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') { + reason = "invalid charater @" + i; + break; + } + } + + if (reason != null) + throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason); + return path; + } + + /** + * Invocation of changes to the file system state is abstracted through this to allow transactional + * changes to notify on commit + */ + private abstract static class Listeners { + + /** Translating method */ + public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) { + String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/ + PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data)); + notify(path, event); + } + + public abstract void notify(Path path, PathChildrenCacheEvent event); + + } + + /** The regular listener implementation which notifies registered file and directory listeners */ + private class ListenerMap extends Listeners { + + private final Map<Path, PathChildrenCacheListener> directoryListeners = new ConcurrentHashMap<>(); + private final Map<Path, NodeCacheListener> fileListeners = new ConcurrentHashMap<>(); + + public void add(Path path, PathChildrenCacheListener listener) { + directoryListeners.put(path, listener); + } + + public void add(Path path, NodeCacheListener listener) { + fileListeners.put(path, listener); + } + + @Override + public void notify(Path path, PathChildrenCacheEvent event) { + try { + // Snapshot directoryListeners in case notification leads to new directoryListeners added + Set<Map.Entry<Path, PathChildrenCacheListener>> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet()); + for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryListenerSnapshot) { + if (path.isChildOf(listener.getKey())) + listener.getValue().childEvent(MockCuratorFramework.this, event); + } + + // Snapshot directoryListeners in case notification leads to new directoryListeners added + Set<Map.Entry<Path, NodeCacheListener>> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet()); + for (Map.Entry<Path, NodeCacheListener> listener : fileListenerSnapshot) { + if (path.equals(listener.getKey())) + listener.getValue().nodeChanged(); + } + } + catch (Exception e) { + e.printStackTrace(); // TODO: Remove + throw new RuntimeException("Exception notifying listeners", e); + } + } + + } + + private class MockCompletionWaiter implements Curator.CompletionWaiter { + + @Override + public void awaitCompletion(Duration timeout) { + if (shouldTimeoutOnEnter) { + throw new CompletionTimeoutException(""); + } + } + + @Override + public void notifyCompletion() { + } + + } + + /** A lock which works inside a single vm */ + private class MockLock extends InterProcessSemaphoreMutex { + + public boolean timeoutOnLock = false; + public boolean throwExceptionOnLock = false; + + private final String path; + + private Lock lock = null; + + public MockLock(String path) { + super(MockCuratorFramework.this, path); + this.path = path; + } + + @Override + public boolean acquire(long timeout, TimeUnit unit) { + if (throwExceptionOnLock) + throw new CuratorLockException("Thrown by mock"); + if (timeoutOnLock) return false; + + try { + lock = locks.lock(path, timeout, unit); + return true; + } + catch (UncheckedTimeoutException e) { + return false; + } + } + + @Override + public void acquire() { + if (throwExceptionOnLock) + throw new CuratorLockException("Thrown by mock"); + + lock = locks.lock(path); + } + + @Override + public void release() { + if (lock != null) + lock.close(); + } + + } + + private class MockAtomicCounter extends DistributedAtomicLong { + + private boolean initialized = false; + private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/ + + public MockAtomicCounter(String path) { + super(MockCuratorFramework.this, path, new RetryForever(1_000)); + } + + @Override + public boolean initialize(Long value) { + if (initialized) return false; + this.value = new MockLongValue(value); + initialized = true; + return true; + } + + @Override + public AtomicValue<Long> get() { + if (value == null) return new MockLongValue(0); + return value; + } + + public AtomicValue<Long> add(Long delta) { + return trySet(value.postValue() + delta); + } + + public AtomicValue<Long> subtract(Long delta) { + return trySet(value.postValue() - delta); + } + + @Override + public AtomicValue<Long> increment() { + return trySet(value.postValue() + 1); + } + + public AtomicValue<Long> decrement() { + return trySet(value.postValue() - 1); + } + + @Override + public AtomicValue<Long> trySet(Long longval) { + value = new MockLongValue(longval); + return value; + } + + public void forceSet(Long newValue) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + } + + private static class MockLongValue implements AtomicValue<Long> { + + private final AtomicLong value = new AtomicLong(); + + public MockLongValue(long value) { + this.value.set(value); + } + + @Override + public boolean succeeded() { + return true; + } + + public void setValue(long value) { + this.value.set(value); + } + + @Override + public Long preValue() { + return value.get(); + } + + @Override + public Long postValue() { + return value.get(); + } + + @Override + public AtomicStats getStats() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + } + + private class MockDirectoryCache implements Curator.DirectoryCache { + + /** The path this is caching and listening to */ + private final Path path; + + public MockDirectoryCache(Path path) { + this.path = path; + } + + @Override + public void start() {} + + @Override + public void addListener(PathChildrenCacheListener listener) { + listeners.add(path, listener); + } + + @Override + public List<ChildData> getCurrentData() { + List<ChildData> childData = new ArrayList<>(); + for (String childName : getChildren(path)) { + Path childPath = path.append(childName); + childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get())); + } + return childData; + } + + @Override + public ChildData getCurrentData(Path fullPath) { + if (!fullPath.getParentPath().equals(path)) { + throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'"); + } + + return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null); + } + + @Override + public void close() {} + + private List<String> getChildren(Path path) { + try { + return MockCuratorFramework.this.getChildren().forPath(path.getAbsolute()); + } catch (KeeperException.NoNodeException e) { + return List.of(); + } catch (Exception e) { + throw new RuntimeException("Could not get children of " + path.getAbsolute(), e); + } + } + + private Optional<byte[]> getData(Path path) { + try { + return Optional.of(MockCuratorFramework.this.getData().forPath(path.getAbsolute())); + } + catch (KeeperException.NoNodeException e) { + return Optional.empty(); + } + catch (Exception e) { + throw new RuntimeException("Could not get data at " + path.getAbsolute(), e); + } + } + + } + + private class MockFileCache implements Curator.FileCache { + + /** The path this is caching and listening to */ + private final Path path; + + public MockFileCache(Path path) { + this.path = path; + } + + @Override + public void start() {} + + @Override + public void addListener(NodeCacheListener listener) { + listeners.add(path, listener); + } + + @Override + public ChildData getCurrentData() { + MemoryFileSystem.Node node = fileSystem.root().getNode(Paths.get(path.toString()), false); + if (node == null) return null; + return new ChildData("/" + path.toString(), null, node.getContent()); + } + + @Override + public void close() {} + + } + + + // ----- The rest of this file is adapting the Curator (non-recipe) API to the ----- + // ----- file system methods above. ----- + // ----- There's nothing to see unless you are interested in an illustration of ----- + // ----- the folly of fluent API's or, more generally, mankind. ----- + private abstract static class MockProtectACLCreateModeStatPathAndBytesable<String> + implements ProtectACLCreateModeStatPathAndBytesable<String> { + + public BackgroundPathAndBytesable<String> withACL(List<ACL> list) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public ProtectACLCreateModeStatPathAndBytesable<String> withMode(CreateMode createMode) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ACLCreateModeBackgroundPathAndBytesable<java.lang.String> withProtection() { + return null; + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground() { + return null; + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(Object o) { + return null; + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) { + return null; + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) { + return null; + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) { + return null; + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + return null; + } + + @Override + public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat) { + return null; + } + + } + + private class MockCreateBuilder implements CreateBuilder { + + private boolean createParents = false; + private CreateMode createMode = CreateMode.PERSISTENT; + + @Override + public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded() { + createParents = true; + return new MockProtectACLCreateModeStatPathAndBytesable<>() { + + @Override + public String forPath(String s, byte[] bytes) throws Exception { + return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); + } + + @Override + public String forPath(String s) throws Exception { + return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); + } + + }; + } + + @Override + public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentContainersIfNeeded() { + return new MockProtectACLCreateModeStatPathAndBytesable<>() { + + @Override + public String forPath(String s, byte[] bytes) throws Exception { + return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); + } + + @Override + public String forPath(String s) throws Exception { + return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); + } + + }; + } + + @Override + @Deprecated + public ACLPathAndBytesable<String> withProtectedEphemeralSequential() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ACLCreateModeStatBackgroundPathAndBytesable<String> withProtection() { + return null; + } + + public String forPath(String s) throws Exception { + return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); + } + + public String forPath(String s, byte[] bytes) throws Exception { + return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CreateBuilderMain withTtl(long l) { + return null; + } + + @Override + public CreateBuilder2 orSetData() { + return null; + } + + @Override + public CreateBuilder2 orSetData(int i) { + return null; + } + + @Override + public CreateBackgroundModeStatACLable compressed() { + return null; + } + + @Override + public CreateProtectACLCreateModePathAndBytesable<String> storingStatIn(Stat stat) { + return null; + } + + @Override + public BackgroundPathAndBytesable<String> withACL(List<ACL> list) { + return null; + } + + @Override + public ACLBackgroundPathAndBytesable<String> withMode(CreateMode createMode) { + this.createMode = createMode; + return this; + } + + @Override + public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) { + return null; + } + } + + private static class MockBackgroundPathableBuilder<T> implements BackgroundPathable<T>, Watchable<BackgroundPathable<T>> { + + @Override + public ErrorListenerPathable<T> inBackground() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<T> inBackground(Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public T forPath(String s) throws Exception { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathable<T> watched() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathable<T> usingWatcher(Watcher watcher) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathable<T> usingWatcher(CuratorWatcher curatorWatcher) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder<List<String>> implements GetChildrenBuilder { + + @Override + public WatchPathable<List<String>> storingStatIn(Stat stat) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public List<String> forPath(String path) throws Exception { + return getChildren(path, fileSystem.root()); + } + + } + + private class MockExistsBuilder extends MockBackgroundPathableBuilder<Stat> implements ExistsBuilder { + + @Override + public Stat forPath(String path) throws Exception { + try { + MemoryFileSystem.Node node = getNode(path, fileSystem.root()); + Stat stat = new Stat(); + stat.setVersion(node.version()); + return stat; + } + catch (KeeperException.NoNodeException e) { + return null; + } + } + + @Override + public ACLableExistBuilderMain creatingParentsIfNeeded() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ACLableExistBuilderMain creatingParentContainersIfNeeded() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder { + + private boolean deleteChildren = false; + + @Override + public BackgroundVersionable deletingChildrenIfNeeded() { + deleteChildren = true; + return this; + } + + @Override + public ChildrenDeletable guaranteed() { + return this; + } + + @Override + public BackgroundPathable<Void> withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public Void forPath(String pathString) throws Exception { + deleteNode(pathString, deleteChildren, fileSystem.root(), listeners); + return null; + } + + @Override + public DeleteBuilderMain quietly() { + return this; + } + } + + private class MockGetDataBuilder extends MockBackgroundPathableBuilder<byte[]> implements GetDataBuilder { + + @Override + public GetDataWatchBackgroundStatable decompressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public byte[] forPath(String path) throws Exception { + return getData(path, fileSystem.root()); + } + + @Override + public ErrorListenerPathable<byte[]> inBackground() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<byte[]> inBackground(Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public WatchPathable<byte[]> storingStatIn(Stat stat) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + // extends MockBackgroundACLPathAndBytesableBuilder<Stat> + private class MockSetDataBuilder implements SetDataBuilder { + + @Override + public SetDataBackgroundVersionable compressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathAndBytesable<Stat> withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public Stat forPath(String path, byte[] bytes) throws Exception { + setData(path, bytes, fileSystem.root(), listeners); + return null; + } + + @Override + public Stat forPath(String s) throws Exception { + return null; + } + + @Override + public ErrorListenerPathAndBytesable<Stat> inBackground() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<Stat> inBackground(Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + /** Allows addition of directoryListeners which are never called */ + private class MockListenable<T> implements Listenable<T> { + + @Override + public void addListener(T t) { + } + + @Override + public void addListener(T t, Executor executor) { + } + + @Override + public void removeListener(T t) { + } + + } + + private class MockCuratorTransactionFinal implements CuratorTransactionFinal { + + /** The new directory root in which the transactional changes are made */ + private MemoryFileSystem.Node newRoot; + + private boolean committed = false; + + private final DelayedListener delayedListener = new DelayedListener(); + + public MockCuratorTransactionFinal() { + newRoot = fileSystem.root().clone(); + } + + @Override + public Collection<CuratorTransactionResult> commit() throws Exception { + fileSystem.replaceRoot(newRoot); + committed = true; + delayedListener.commit(); + return null; // TODO + } + + @Override + public TransactionCreateBuilder create() { + ensureNotCommitted(); + return new MockTransactionCreateBuilder(); + } + + @Override + public TransactionDeleteBuilder delete() { + ensureNotCommitted(); + return new MockTransactionDeleteBuilder(); + } + + @Override + public TransactionSetDataBuilder setData() { + ensureNotCommitted(); + return new MockTransactionSetDataBuilder(); + } + + @Override + public TransactionCheckBuilder check() { + ensureNotCommitted(); + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + private void ensureNotCommitted() { + if (committed) throw new IllegalStateException("transaction already committed"); + } + + private class MockTransactionCreateBuilder implements TransactionCreateBuilder { + + private CreateMode createMode = CreateMode.PERSISTENT; + + @Override + public ACLCreateModePathAndBytesable<CuratorTransactionBridge> compressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ACLPathAndBytesable<CuratorTransactionBridge> withMode(CreateMode createMode) { + this.createMode = createMode; + return this; + } + + @Override + public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { + createNode(s, bytes, false, createMode, newRoot, delayedListener); + return new MockCuratorTransactionBridge(); + } + + @Override + public CuratorTransactionBridge forPath(String s) throws Exception { + createNode(s, new byte[0], false, createMode, newRoot, delayedListener); + return new MockCuratorTransactionBridge(); + } + + @Override + public TransactionCreateBuilder2 withTtl(long l) { + return this; + } + + @Override + public Object withACL(List list, boolean b) { + return this; + } + + @Override + public Object withACL(List list) { + return this; + } + } + + private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder { + + @Override + public Pathable<CuratorTransactionBridge> withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorTransactionBridge forPath(String path) throws Exception { + deleteNode(path, false, newRoot, delayedListener); + return new MockCuratorTransactionBridge(); + } + + } + + private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder { + + @Override + public VersionPathAndBytesable<CuratorTransactionBridge> compressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { + MockCuratorFramework.this.setData(s, bytes, newRoot, delayedListener); + return new MockCuratorTransactionBridge(); + } + + @Override + public CuratorTransactionBridge forPath(String s) throws Exception { + MockCuratorFramework.this.setData(s, new byte[0], newRoot, delayedListener); + return new MockCuratorTransactionBridge(); + } + + } + + private class MockCuratorTransactionBridge implements CuratorTransactionBridge { + + @Override + public CuratorTransactionFinal and() { + return MockCuratorTransactionFinal.this; + } + + } + + /** A class which collects listen events and forwards them to the regular directoryListeners on commit */ + private class DelayedListener extends Listeners { + + private final List<Pair<Path, PathChildrenCacheEvent>> events = new ArrayList<>(); + + @Override + public void notify(Path path, PathChildrenCacheEvent event) { + events.add(new Pair<>(path, event)); + } + + public void commit() { + for (Pair<Path, PathChildrenCacheEvent> event : events) + listeners.notify(event.getFirst(), event.getSecond()); + } + + } + + } + +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java index 07f0924cb31..777da5988eb 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. @ExportPackage -@PublicApi +@PublicApi // TODO: Revoke this on Vespa 8. package com.yahoo.vespa.curator; import com.yahoo.api.annotations.PublicApi; import com.yahoo.osgi.annotation.ExportPackage; diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java new file mode 100644 index 00000000000..a518d8df843 --- /dev/null +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java @@ -0,0 +1,74 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator; + +import com.yahoo.net.HostName; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * @author mpolden + */ +public class ConnectionSpecTest { + + @Test + public void create() { + HostName.setHostNameForTestingOnly("host2"); + Config config = new Config(List.of(new Config.Server("host1", 10001), + new Config.Server("host2", 10002), + new Config.Server("host3", 10003))); + + { + ConnectionSpec spec = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, false); + assertEquals("host1:10001,host2:10002,host3:10003", spec.local()); + assertEquals("host1:10001,host2:10002,host3:10003", spec.ensemble()); + assertEquals(3, spec.ensembleSize()); + } + + { + ConnectionSpec specLocalAffinity = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, true); + assertEquals("host2:10002", specLocalAffinity.local()); + assertEquals("host1:10001,host2:10002,host3:10003", specLocalAffinity.ensemble()); + assertEquals(3, specLocalAffinity.ensembleSize()); + } + + { + ConnectionSpec specFromString = ConnectionSpec.create("host1:10001", "host1:10001,host2:10002"); + assertEquals("host1:10001", specFromString.local()); + assertEquals("host1:10001,host2:10002", specFromString.ensemble()); + assertEquals(2, specFromString.ensembleSize()); + } + } + + private static class Config { + + private final List<Server> servers; + + public Config(List<Server> servers) { + this.servers = servers; + } + + private static class Server { + + private final String hostname; + private final int port; + + public Server(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public String hostname() { + return hostname; + } + + public int port() { + return port; + } + } + + } + +} diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java index 2bf40c4e2bb..5341efaefe5 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.curator; -import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.cloud.config.CuratorConfig; import com.yahoo.net.HostName; import org.apache.curator.test.TestingServer; import org.junit.After; @@ -61,45 +61,33 @@ public class CuratorTest { @Test public void require_that_server_count_is_correct() { - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhost, port1)); - try (Curator curator = createCurator(new ConfigserverConfig(builder))) { + CuratorConfig.Builder builder = new CuratorConfig.Builder(); + builder.server(createZKBuilder(localhost, port1)); + try (Curator curator = createCurator(new CuratorConfig(builder))) { assertEquals(1, curator.zooKeeperEnsembleCount()); } } - @Test - public void localhost_affinity() { - String localhostHostName = "myhost"; - int localhostPort = 123; - - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhostHostName, localhostPort)); - builder.zookeeperserver(createZKBuilder("otherhost", 345)); - ConfigserverConfig config = new ConfigserverConfig(builder); - - HostName.setHostNameForTestingOnly(localhostHostName); - - String localhostSpec = localhostHostName + ":" + localhostPort; - assertEquals(localhostSpec, Curator.createConnectionSpecForLocalhost(config)); - } - - private ConfigserverConfig createTestConfig() { - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhost, port1)); - builder.zookeeperserver(createZKBuilder(localhost, port2)); - return new ConfigserverConfig(builder); + private CuratorConfig createTestConfig() { + CuratorConfig.Builder builder = new CuratorConfig.Builder(); + builder.server(createZKBuilder(localhost, port1)); + builder.server(createZKBuilder(localhost, port2)); + return new CuratorConfig(builder); } - private ConfigserverConfig.Zookeeperserver.Builder createZKBuilder(String hostname, int port) { - ConfigserverConfig.Zookeeperserver.Builder zkBuilder = new ConfigserverConfig.Zookeeperserver.Builder(); + private CuratorConfig.Server.Builder createZKBuilder(String hostname, int port) { + CuratorConfig.Server.Builder zkBuilder = new CuratorConfig.Server.Builder(); zkBuilder.hostname(hostname); zkBuilder.port(port); return zkBuilder; } - private Curator createCurator(ConfigserverConfig configserverConfig) { - return new Curator(configserverConfig, Optional.empty()); + private Curator createCurator(CuratorConfig curatorConfig) { + return new Curator(ConnectionSpec.create(curatorConfig.server(), + CuratorConfig.Server::hostname, + CuratorConfig.Server::port, + curatorConfig.zookeeperLocalhostAffinity()), + Optional.empty()); } private static class PortAllocator { |