aboutsummaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-11-26 11:43:38 +0100
committerHarald Musum <musum@verizonmedia.com>2020-11-26 11:43:38 +0100
commit3838f731e99e2a24871b3a90eb40d18579a56e40 (patch)
tree7e07cd3a87782c3f1a025e25127c8cdf5e2039d7 /zkfacade
parentc2895954f3465b6cdda26c84ecb6a9d1fb8e4a0d (diff)
parent693277054fd2f122ae40aa011e848526fad8a64e (diff)
Merge branch 'master' into revert-14062-revert-14057-hmusum/upgrade-to-curator-4
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/abi-spec.json18
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java102
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java142
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java2
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java19
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/package-info.java10
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java1561
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java1356
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java2
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java74
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java46
11 files changed, 1661 insertions, 1671 deletions
diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json
index e026559b283..e0de2622149 100644
--- a/zkfacade/abi-spec.json
+++ b/zkfacade/abi-spec.json
@@ -60,6 +60,7 @@
"com.yahoo.vespa.curator.Curator": {
"superClass": "java.lang.Object",
"interfaces": [
+ "com.yahoo.vespa.curator.api.VespaCurator",
"java.lang.AutoCloseable"
],
"attributes": [
@@ -68,6 +69,7 @@
"methods": [
"public static com.yahoo.vespa.curator.Curator create(java.lang.String)",
"public static com.yahoo.vespa.curator.Curator create(java.lang.String, java.util.Optional)",
+ "public void <init>(com.yahoo.cloud.config.CuratorConfig, com.yahoo.vespa.zookeeper.VespaZooKeeperServer)",
"public void <init>(com.yahoo.cloud.config.ConfigserverConfig, com.yahoo.vespa.zookeeper.VespaZooKeeperServer)",
"protected void <init>(java.lang.String, java.lang.String, java.util.function.Function)",
"public java.lang.String connectionSpec()",
@@ -89,7 +91,8 @@
"public org.apache.curator.framework.CuratorFramework framework()",
"public void close()",
"public java.lang.String zooKeeperEnsembleConnectionSpec()",
- "public int zooKeeperEnsembleCount()"
+ "public int zooKeeperEnsembleCount()",
+ "public bridge synthetic java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)"
],
"fields": [
"protected final org.apache.curator.RetryPolicy retryPolicy"
@@ -110,5 +113,18 @@
"public void close()"
],
"fields": []
+ },
+ "com.yahoo.vespa.curator.api.VespaCurator": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)"
+ ],
+ "fields": []
}
} \ No newline at end of file
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java
new file mode 100644
index 00000000000..4409291419a
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java
@@ -0,0 +1,102 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator;
+
+import com.yahoo.net.HostName;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A connection spec for Curator.
+ *
+ * @author mpolden
+ */
+class ConnectionSpec {
+
+ private final String local;
+ private final String ensemble;
+ private final int ensembleSize;
+
+ private ConnectionSpec(String local, String ensemble, int ensembleSize) {
+ this.local = requireNonEmpty(local, "local spec");
+ this.ensemble = requireNonEmpty(ensemble, "ensemble spec");
+ this.ensembleSize = ensembleSize;
+ }
+
+ /** Returns the local spec. This may be a subset of the ensemble spec */
+ public String local() {
+ return local;
+ }
+
+ /** Returns the ensemble spec. This always contains all nodes in the ensemble */
+ public String ensemble() {
+ return ensemble;
+ }
+
+ /** Returns the number of servers in the ensemble */
+ public int ensembleSize() {
+ return ensembleSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConnectionSpec that = (ConnectionSpec) o;
+ return ensembleSize == that.ensembleSize &&
+ local.equals(that.local) &&
+ ensemble.equals(that.ensemble);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(local, ensemble, ensembleSize);
+ }
+
+ public static ConnectionSpec create(String spec) {
+ return create(spec, spec);
+ }
+
+ public static ConnectionSpec create(String localSpec, String ensembleSpec) {
+ return new ConnectionSpec(localSpec, ensembleSpec, ensembleSpec.split(",").length);
+ }
+
+ public static <T> ConnectionSpec create(List<T> servers,
+ Function<T, String> hostnameGetter,
+ Function<T, Integer> portGetter,
+ boolean localhostAffinity) {
+ String localSpec = createSpec(servers, hostnameGetter, portGetter, localhostAffinity);
+ String ensembleSpec = localhostAffinity ? createSpec(servers, hostnameGetter, portGetter, false) : localSpec;
+ return new ConnectionSpec(localSpec, ensembleSpec, servers.size());
+ }
+
+ private static <T> String createSpec(List<T> servers,
+ Function<T, String> hostnameGetter,
+ Function<T, Integer> portGetter,
+ boolean localhostAffinity) {
+ String thisServer = HostName.getLocalhost();
+ StringBuilder connectionSpec = new StringBuilder();
+ for (var server : servers) {
+ if (localhostAffinity && !thisServer.equals(hostnameGetter.apply(server))) continue;
+ connectionSpec.append(hostnameGetter.apply(server));
+ connectionSpec.append(':');
+ connectionSpec.append(portGetter.apply(server));
+ connectionSpec.append(',');
+ }
+ if (localhostAffinity && connectionSpec.length() == 0) {
+ throw new IllegalArgumentException("Unable to create connect string to localhost: " +
+ "There is no localhost server specified in config");
+ }
+ if (connectionSpec.length() > 0) {
+ connectionSpec.setLength(connectionSpec.length() - 1); // Remove trailing comma
+ }
+ return connectionSpec.toString();
+ }
+
+ private static String requireNonEmpty(String s, String field) {
+ if (Objects.requireNonNull(s).isEmpty()) throw new IllegalArgumentException(field + " must be non-empty");
+ return s;
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 04bd64219d4..90eec5760fc 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -3,10 +3,11 @@ package com.yahoo.vespa.curator;
import com.google.inject.Inject;
import com.yahoo.cloud.config.ConfigserverConfig;
+import com.yahoo.cloud.config.CuratorConfig;
import com.yahoo.io.IOUtils;
-import com.yahoo.net.HostName;
import com.yahoo.path.Path;
import com.yahoo.text.Utf8;
+import com.yahoo.vespa.curator.api.VespaCurator;
import com.yahoo.vespa.curator.recipes.CuratorCounter;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
@@ -31,6 +32,7 @@ import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -47,7 +49,7 @@ import java.util.logging.Logger;
* @author vegardh
* @author bratseth
*/
-public class Curator implements AutoCloseable {
+public class Curator implements VespaCurator, AutoCloseable {
private static final Logger LOG = Logger.getLogger(Curator.class.getName());
private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg"));
@@ -55,81 +57,67 @@ public class Curator implements AutoCloseable {
private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1);
private static final int MAX_RETRIES = 10;
+ private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES);
- protected final RetryPolicy retryPolicy;
+ protected final RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY;
private final CuratorFramework curatorFramework;
- private final String connectionSpec; // May be a subset of the servers in the ensemble
- private final String zooKeeperEnsembleConnectionSpec;
- private final int zooKeeperEnsembleCount;
+ private final ConnectionSpec connectionSpec;
// All lock keys, to allow re-entrancy. This will grow forever, but this should be too slow to be a problem
private final ConcurrentHashMap<Path, Lock> locks = new ConcurrentHashMap<>();
/** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */
public static Curator create(String connectionSpec) {
- return new Curator(connectionSpec, connectionSpec, Optional.of(ZK_CLIENT_CONFIG_FILE));
+ return new Curator(ConnectionSpec.create(connectionSpec), Optional.of(ZK_CLIENT_CONFIG_FILE));
}
// For testing only, use Optional.empty for clientConfigFile parameter to create default zookeeper client config
public static Curator create(String connectionSpec, Optional<File> clientConfigFile) {
- return new Curator(connectionSpec, connectionSpec, clientConfigFile);
+ return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile);
}
- // Depend on ZooKeeperServer to make sure it is started first
- // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well)
@Inject
- public Curator(ConfigserverConfig configserverConfig, VespaZooKeeperServer server) {
- this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE));
+ public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) {
+ // Depends on ZooKeeperServer to make sure it is started first
+ this(ConnectionSpec.create(curatorConfig.server(),
+ CuratorConfig.Server::hostname,
+ CuratorConfig.Server::port,
+ curatorConfig.zookeeperLocalhostAffinity()),
+ Optional.of(ZK_CLIENT_CONFIG_FILE));
}
- Curator(ConfigserverConfig configserverConfig, Optional<File> clientConfigFile) {
- this(createConnectionSpec(configserverConfig), createEnsembleConnectionSpec(configserverConfig), clientConfigFile);
+ // TODO: This can be removed when this package is no longer public API.
+ public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) {
+ this(ConnectionSpec.create(configserverConfig.zookeeperserver(),
+ ConfigserverConfig.Zookeeperserver::hostname,
+ ConfigserverConfig.Zookeeperserver::port,
+ configserverConfig.zookeeperLocalhostAffinity()),
+ Optional.of(ZK_CLIENT_CONFIG_FILE));
}
- private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Optional<File> clientConfigFile) {
- this(connectionSpec,
- zooKeeperEnsembleConnectionSpec,
- (retryPolicy) -> CuratorFrameworkFactory
- .builder()
- .retryPolicy(retryPolicy)
- .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis())
- .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis())
- .connectString(connectionSpec)
- .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile)))
- .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8
- .build());
- }
-
- protected Curator(String connectionSpec,
- String zooKeeperEnsembleConnectionSpec,
- Function<RetryPolicy, CuratorFramework> curatorFactory) {
- this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory,
- new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES));
+ protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function<RetryPolicy, CuratorFramework> curatorFactory) {
+ this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory.apply(DEFAULT_RETRY_POLICY));
}
- private Curator(String connectionSpec,
- String zooKeeperEnsembleConnectionSpec,
- Function<RetryPolicy, CuratorFramework> curatorFactory,
- RetryPolicy retryPolicy) {
- this.connectionSpec = connectionSpec;
- this.retryPolicy = retryPolicy;
- this.curatorFramework = curatorFactory.apply(retryPolicy);
- if (this.curatorFramework != null) {
- validateConnectionSpec(connectionSpec);
- validateConnectionSpec(zooKeeperEnsembleConnectionSpec);
- addLoggingListener();
- curatorFramework.start();
- }
-
- this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec;
- this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length;
+ Curator(ConnectionSpec connectionSpec, Optional<File> clientConfigFile) {
+ this(connectionSpec,
+ CuratorFrameworkFactory
+ .builder()
+ .retryPolicy(DEFAULT_RETRY_POLICY)
+ .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis())
+ .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis())
+ .connectString(connectionSpec.local())
+ .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile)))
+ .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8
+ .build());
}
- private static String createConnectionSpec(ConfigserverConfig configserverConfig) {
- return configserverConfig.zookeeperLocalhostAffinity()
- ? createConnectionSpecForLocalhost(configserverConfig)
- : createEnsembleConnectionSpec(configserverConfig);
+ private Curator(ConnectionSpec connectionSpec, CuratorFramework curatorFramework) {
+ this.connectionSpec = Objects.requireNonNull(connectionSpec);
+ this.curatorFramework = Objects.requireNonNull(curatorFramework);
+ addLoggingListener();
+ curatorFramework.start();
}
private static ZKClientConfig createClientConfig(Optional<File> clientConfigFile) {
@@ -148,39 +136,6 @@ public class Curator implements AutoCloseable {
}
}
- private static String createEnsembleConnectionSpec(ConfigserverConfig config) {
- StringBuilder connectionSpec = new StringBuilder();
- for (int i = 0; i < config.zookeeperserver().size(); i++) {
- if (connectionSpec.length() > 0) {
- connectionSpec.append(',');
- }
- ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
- connectionSpec.append(server.hostname());
- connectionSpec.append(':');
- connectionSpec.append(server.port());
- }
- return connectionSpec.toString();
- }
-
- static String createConnectionSpecForLocalhost(ConfigserverConfig config) {
- String thisServer = HostName.getLocalhost();
-
- for (int i = 0; i < config.zookeeperserver().size(); i++) {
- ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
- if (thisServer.equals(server.hostname())) {
- return String.format("%s:%d", server.hostname(), server.port());
- }
- }
-
- throw new IllegalArgumentException("Unable to create connect string to localhost: " +
- "There is no localhost server specified in config: " + config);
- }
-
- private static void validateConnectionSpec(String connectionSpec) {
- if (connectionSpec == null || connectionSpec.isEmpty())
- throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec));
- }
-
/**
* Returns the ZooKeeper "connect string" used by curator: a comma-separated list of
* host:port of ZooKeeper endpoints to connect to. This may be a subset of
@@ -189,7 +144,7 @@ public class Curator implements AutoCloseable {
*
* This may be empty but never null
*/
- public String connectionSpec() { return connectionSpec; }
+ public String connectionSpec() { return connectionSpec.local(); }
/** For internal use; prefer creating a {@link CuratorCounter} */
public DistributedAtomicLong createAtomicCounter(String path) {
@@ -243,13 +198,14 @@ public class Curator implements AutoCloseable {
* A convenience method which sets some content at a path.
* If the path and any of its parents does not exists they are created.
*/
+ // TODO: Use create().orSetData() in Curator 4 and later
public void set(Path path, byte[] data) {
+ if ( ! exists(path))
+ create(path);
+
String absolutePath = path.getAbsolute();
try {
- if ( ! exists(path))
- framework().create().creatingParentsIfNeeded().forPath(absolutePath, data);
- else
- framework().setData().forPath(absolutePath, data);
+ framework().setData().forPath(absolutePath, data);
} catch (Exception e) {
throw new RuntimeException("Could not set data at " + absolutePath, e);
}
@@ -432,7 +388,7 @@ public class Curator implements AutoCloseable {
* TODO: Move method out of this class.
*/
public String zooKeeperEnsembleConnectionSpec() {
- return zooKeeperEnsembleConnectionSpec;
+ return connectionSpec.ensemble();
}
/**
@@ -440,7 +396,7 @@ public class Curator implements AutoCloseable {
* WARNING: This may be different from the number of servers this Curator may connect to.
* TODO: Move method out of this class.
*/
- public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; }
+ public int zooKeeperEnsembleCount() { return connectionSpec.ensembleSize(); }
private static Optional<String> getEnvironmentVariable(String variableName) {
return Optional.ofNullable(System.getenv().get(variableName))
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
index fb8c303db66..0f8c524fa98 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorCompletionWaiter.java
@@ -57,6 +57,8 @@ class CuratorCompletionWaiter implements Curator.CompletionWaiter {
List<String> respondents;
do {
respondents = curator.framework().getChildren().forPath(barrierPath);
+ log.log(Level.FINE, respondents.size() + "/" + curator.zooKeeperEnsembleCount() + " responded: " +
+ respondents + ", all participants: " + curator.zooKeeperEnsembleConnectionSpec());
// First, check if all config servers responded
if (respondents.size() == curator.zooKeeperEnsembleCount()) {
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
new file mode 100644
index 00000000000..0b6fa55719e
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
@@ -0,0 +1,19 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator.api;
+
+import com.yahoo.path.Path;
+
+import java.time.Duration;
+
+/**
+ * A client for a ZooKeeper cluster running inside Vespa. Applications that want to use ZooKeeper can inject this in
+ * their code.
+ *
+ * @author mpolden
+ */
+public interface VespaCurator {
+
+ /** Create and acquire a re-entrant lock in given path. This blocks until the lock is acquired or timeout elapses. */
+ AutoCloseable lock(Path path, Duration timeout);
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/package-info.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/package-info.java
new file mode 100644
index 00000000000..679dd3750cb
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/package-info.java
@@ -0,0 +1,10 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * @author mpolden
+ */
+@PublicApi
+@ExportPackage
+package com.yahoo.vespa.curator.api;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
index 1f583ada7a1..26f1c336874 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
@@ -1,110 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator.mock;
-import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.google.inject.Inject;
-import com.yahoo.collections.Pair;
-import com.yahoo.concurrent.Lock;
-import com.yahoo.concurrent.Locks;
import com.yahoo.path.Path;
-import com.yahoo.vespa.curator.CompletionTimeoutException;
import com.yahoo.vespa.curator.Curator;
-import com.yahoo.vespa.curator.recipes.CuratorLockException;
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.WatcherRemoveCuratorFramework;
-import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.ACLCreateModePathAndBytesable;
-import org.apache.curator.framework.api.ACLCreateModeStatBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.ACLPathAndBytesable;
-import org.apache.curator.framework.api.ACLableExistBuilderMain;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.BackgroundPathAndBytesable;
-import org.apache.curator.framework.api.BackgroundPathable;
-import org.apache.curator.framework.api.BackgroundVersionable;
-import org.apache.curator.framework.api.ChildrenDeletable;
-import org.apache.curator.framework.api.CreateBackgroundModeStatACLable;
-import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.CreateBuilder2;
-import org.apache.curator.framework.api.CreateBuilderMain;
-import org.apache.curator.framework.api.CreateProtectACLCreateModePathAndBytesable;
-import org.apache.curator.framework.api.CuratorListener;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.api.DeleteBuilder;
-import org.apache.curator.framework.api.DeleteBuilderMain;
-import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.framework.api.ExistsBuilder;
-import org.apache.curator.framework.api.GetACLBuilder;
-import org.apache.curator.framework.api.GetChildrenBuilder;
-import org.apache.curator.framework.api.GetConfigBuilder;
-import org.apache.curator.framework.api.GetDataBuilder;
-import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
-import org.apache.curator.framework.api.PathAndBytesable;
-import org.apache.curator.framework.api.Pathable;
-import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
-import org.apache.curator.framework.api.ReconfigBuilder;
-import org.apache.curator.framework.api.RemoveWatchesBuilder;
-import org.apache.curator.framework.api.SetACLBuilder;
-import org.apache.curator.framework.api.SetDataBackgroundVersionable;
-import org.apache.curator.framework.api.SetDataBuilder;
-import org.apache.curator.framework.api.SyncBuilder;
-import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.api.VersionPathAndBytesable;
-import org.apache.curator.framework.api.WatchPathable;
-import org.apache.curator.framework.api.Watchable;
-import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
-import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
-import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
-import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
-import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
-import org.apache.curator.framework.api.transaction.TransactionCreateBuilder2;
-import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
-import org.apache.curator.framework.api.transaction.TransactionOp;
-import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.recipes.atomic.AtomicStats;
-import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.framework.schema.SchemaSet;
-import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
/**
* <p>A <b>non thread safe</b> mock of the curator API.
@@ -121,24 +25,7 @@ import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
*/
public class MockCurator extends Curator {
- public boolean timeoutOnLock = false;
- public boolean throwExceptionOnLock = false;
- private boolean shouldTimeoutOnEnter = false;
- private int monotonicallyIncreasingNumber = 0;
- private final boolean stableOrdering;
private String zooKeeperEnsembleConnectionSpec = "";
- private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS);
-
- /** The file system used by this mock to store zookeeper files and directories */
- private final MemoryFileSystem fileSystem = new MemoryFileSystem();
-
- /** Atomic counters. A more accurate mock would store these as files in the file system */
- private final Map<String, MockAtomicCounter> atomicCounters = new ConcurrentHashMap<>();
-
- /** Listeners to changes to a particular path */
- private final ListenerMap listeners = new ListenerMap();
-
- private final CuratorFramework curatorFramework;
/** Creates a mock curator with stable ordering */
@Inject
@@ -153,26 +40,24 @@ public class MockCurator extends Curator {
* This is not what ZooKeeper does.
*/
public MockCurator(boolean stableOrdering) {
- super("", "", (retryPolicy) -> null);
- this.stableOrdering = stableOrdering;
- curatorFramework = new MockCuratorFramework();
- curatorFramework.start();
+ super("host1:2181", "host1:2181", (retryPolicy) -> new MockCuratorFramework(stableOrdering, false));
+ }
+
+ private MockCuratorFramework mockFramework() {
+ return (MockCuratorFramework) super.framework();
}
/**
* Lists the entire content of this curator instance as a multiline string.
* Useful for debugging.
*/
- public String dumpState() { return fileSystem.dumpState(); }
-
- /** Returns a started curator framework */
- public CuratorFramework framework() { return curatorFramework; }
+ public String dumpState() { return mockFramework().fileSystem().dumpState(); }
/** Returns an atomic counter in this, or empty if no such counter is created */
public Optional<DistributedAtomicLong> counter(String path) {
- return Optional.ofNullable(atomicCounters.get(path));
+ return Optional.ofNullable(mockFramework().atomicCounters().get(path));
}
-
+
/**
* Sets the ZooKeeper ensemble connection spec, which must be on the form
* host1:port,host2:port ...
@@ -186,1455 +71,37 @@ public class MockCurator extends Curator {
return zooKeeperEnsembleConnectionSpec;
}
- // ----- Start of adaptor methods from Curator to the mock file system -----
-
- /** Creates a node below the given directory root */
- private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, Node root, Listeners listeners)
- throws KeeperException.NodeExistsException, KeeperException.NoNodeException {
- validatePath(pathString);
- Path path = Path.fromString(pathString);
- if (path.isRoot()) return "/"; // the root already exists
- Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents);
- String name = nodeName(path.getName(), createMode);
-
- if (parent == null)
- throw new KeeperException.NoNodeException(path.getParentPath().toString());
- if (parent.children().containsKey(path.getName()))
- throw new KeeperException.NodeExistsException(path.toString());
-
- parent.add(name).setContent(content);
- String nodePath = "/" + path.getParentPath().toString() + "/" + name;
- listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED);
- return nodePath;
- }
-
- /** Deletes a node below the given directory root */
- private void deleteNode(String pathString, boolean deleteChildren, Node root, Listeners listeners)
- throws KeeperException.NoNodeException, KeeperException.NotEmptyException {
- validatePath(pathString);
- Path path = Path.fromString(pathString);
- Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
- if (parent == null) throw new KeeperException.NoNodeException(path.toString());
- Node node = parent.children().get(path.getName());
- if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent);
- if ( ! node.children().isEmpty() && ! deleteChildren)
- throw new KeeperException.NotEmptyException(path.toString());
- parent.remove(path.getName());
- listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED);
- }
-
- /** Returns the data of a node */
- private byte[] getData(String pathString, Node root) throws KeeperException.NoNodeException {
- validatePath(pathString);
- return getNode(pathString, root).getContent();
- }
-
- /** sets the data of an existing node */
- private void setData(String pathString, byte[] content, Node root, Listeners listeners)
- throws KeeperException.NoNodeException {
- validatePath(pathString);
- getNode(pathString, root).setContent(content);
- listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED);
- }
-
- private List<String> getChildren(String path, Node root) throws KeeperException.NoNodeException {
- validatePath(path);
- Node node = root.getNode(Paths.get(path), false);
- if (node == null) throw new KeeperException.NoNodeException(path);
- List<String> children = new ArrayList<>(node.children().keySet());
- if (! stableOrdering)
- Collections.shuffle(children);
- return children;
- }
-
- private boolean exists(String path, Node root) {
- validatePath(path);
- Node parent = root.getNode(Paths.get(Path.fromString(path).getParentPath().toString()), false);
- if (parent == null) return false;
- Node node = parent.children().get(Path.fromString(path).getName());
- return node != null;
- }
-
- /** Returns a node or throws the appropriate exception if it doesn't exist */
- private Node getNode(String pathString, Node root) throws KeeperException.NoNodeException {
- validatePath(pathString);
- Path path = Path.fromString(pathString);
- Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
- if (parent == null) throw new KeeperException.NoNodeException(path.toString());
- Node node = parent.children().get(path.getName());
- if (node == null) throw new KeeperException.NoNodeException(path.toString());
- return node;
- }
-
- private String nodeName(String baseName, CreateMode createMode) {
- switch (createMode) {
- case PERSISTENT: case EPHEMERAL: return baseName;
- case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++;
- default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator");
- }
- }
-
- /** Validates a path using the same rules as ZooKeeper */
- public static String validatePath(String path) throws IllegalArgumentException {
- if (path == null) throw new IllegalArgumentException("Path cannot be null");
- if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0");
- if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character");
- if (path.length() == 1) return path; // done checking - it's the root
- if (path.charAt(path.length() - 1) == '/')
- throw new IllegalArgumentException("Path must not end with / character");
-
- String reason = null;
- char lastc = '/';
- char chars[] = path.toCharArray();
- char c;
- for (int i = 1; i < chars.length; lastc = chars[i], i++) {
- c = chars[i];
-
- if (c == 0) {
- reason = "null character not allowed @" + i;
- break;
- } else if (c == '/' && lastc == '/') {
- reason = "empty node name specified @" + i;
- break;
- } else if (c == '.' && lastc == '.') {
- if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
- reason = "relative paths not allowed @" + i;
- break;
- }
- } else if (c == '.') {
- if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
- reason = "relative paths not allowed @" + i;
- break;
- }
- } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F'
- || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') {
- reason = "invalid charater @" + i;
- break;
- }
- }
-
- if (reason != null)
- throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason);
- return path;
- }
-
- // ----- Mock of Curator recipes accessed through our Curator interface -----
-
@Override
public DistributedAtomicLong createAtomicCounter(String path) {
- MockAtomicCounter counter = atomicCounters.get(path);
- if (counter == null) {
- counter = new MockAtomicCounter(path);
- atomicCounters.put(path, counter);
- }
- return counter;
+ return mockFramework().createAtomicCounter(path);
}
- /** Create a mutex which ensures exclusive access within this single vm */
@Override
public InterProcessLock createMutex(String path) {
- return new MockLock(path);
- }
-
- public MockCurator timeoutBarrierOnEnter(boolean shouldTimeout) {
- shouldTimeoutOnEnter = shouldTimeout;
- return this;
+ return mockFramework().createMutex(path);
}
@Override
public CompletionWaiter getCompletionWaiter(Path parentPath, int numMembers, String id) {
- return new MockCompletionWaiter();
+ return mockFramework().createCompletionWaiter();
}
@Override
public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, int numMembers, String id) {
- return new MockCompletionWaiter();
+ return mockFramework().createCompletionWaiter();
}
@Override
public DirectoryCache createDirectoryCache(String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) {
- return new MockDirectoryCache(Path.fromString(path));
+ return mockFramework().createDirectoryCache(path);
}
@Override
public FileCache createFileCache(String path, boolean dataIsCompressed) {
- return new MockFileCache(Path.fromString(path));
+ return mockFramework().createFileCache(path);
}
@Override
public int zooKeeperEnsembleCount() { return 1; }
- /**
- * Invocation of changes to the file system state is abstracted through this to allow transactional
- * changes to notify on commit
- */
- private abstract class Listeners {
-
- /** Translating method */
- public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) {
- String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/
- PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data));
- notify(path, event);
- }
-
- public abstract void notify(Path path, PathChildrenCacheEvent event);
-
- }
-
- /** The regular listener implementation which notifies registered file and directory listeners */
- private class ListenerMap extends Listeners {
-
- private final Map<Path, PathChildrenCacheListener> directoryListeners = new ConcurrentHashMap<>();
- private final Map<Path, NodeCacheListener> fileListeners = new ConcurrentHashMap<>();
-
- public void add(Path path, PathChildrenCacheListener listener) {
- directoryListeners.put(path, listener);
- }
-
- public void add(Path path, NodeCacheListener listener) {
- fileListeners.put(path, listener);
- }
-
- @Override
- public void notify(Path path, PathChildrenCacheEvent event) {
- try {
- // Snapshot directoryListeners in case notification leads to new directoryListeners added
- Set<Map.Entry<Path, PathChildrenCacheListener>> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet());
- for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryListenerSnapshot) {
- if (path.isChildOf(listener.getKey()))
- listener.getValue().childEvent(curatorFramework, event);
- }
-
- // Snapshot directoryListeners in case notification leads to new directoryListeners added
- Set<Map.Entry<Path, NodeCacheListener>> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet());
- for (Map.Entry<Path, NodeCacheListener> listener : fileListenerSnapshot) {
- if (path.equals(listener.getKey()))
- listener.getValue().nodeChanged();
- }
- }
- catch (Exception e) {
- e.printStackTrace(); // TODO: Remove
- throw new RuntimeException("Exception notifying listeners", e);
- }
- }
-
- }
-
- private class MockCompletionWaiter implements CompletionWaiter {
-
- @Override
- public void awaitCompletion(Duration timeout) {
- if (shouldTimeoutOnEnter) {
- throw new CompletionTimeoutException("");
- }
- }
-
- @Override
- public void notifyCompletion() {
- }
-
- }
-
- /** A lock which works inside a single vm */
- private class MockLock extends InterProcessSemaphoreMutex {
-
- private final String path;
-
- private Lock lock = null;
-
- public MockLock(String path) {
- super(curatorFramework, path);
- this.path = path;
- }
-
- @Override
- public boolean acquire(long timeout, TimeUnit unit) {
- if (throwExceptionOnLock)
- throw new CuratorLockException("Thrown by mock");
- if (timeoutOnLock) return false;
-
- try {
- lock = locks.lock(path, timeout, unit);
- return true;
- }
- catch (UncheckedTimeoutException e) {
- return false;
- }
- }
-
- @Override
- public void acquire() {
- if (throwExceptionOnLock)
- throw new CuratorLockException("Thrown by mock");
-
- lock = locks.lock(path);
- }
-
- @Override
- public void release() {
- if (lock != null)
- lock.close();
- }
-
- }
-
- private class MockAtomicCounter extends DistributedAtomicLong {
-
- private boolean initialized = false;
- private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/
-
- public MockAtomicCounter(String path) {
- super(curatorFramework, path, retryPolicy);
- }
-
- @Override
- public boolean initialize(Long value) {
- if (initialized) return false;
- this.value = new MockLongValue(value);
- initialized = true;
- return true;
- }
-
- @Override
- public AtomicValue<Long> get() {
- if (value == null) return new MockLongValue(0);
- return value;
- }
-
- public AtomicValue<Long> add(Long delta) throws Exception {
- return trySet(value.postValue() + delta);
- }
-
- public AtomicValue<Long> subtract(Long delta) throws Exception {
- return trySet(value.postValue() - delta);
- }
-
- @Override
- public AtomicValue<Long> increment() {
- return trySet(value.postValue() + 1);
- }
-
- public AtomicValue<Long> decrement() throws Exception {
- return trySet(value.postValue() - 1);
- }
-
- @Override
- public AtomicValue<Long> trySet(Long longval) {
- value = new MockLongValue(longval);
- return value;
- }
-
- public void forceSet(Long newValue) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- }
-
- private class MockLongValue implements AtomicValue<Long> {
-
- private AtomicLong value = new AtomicLong();
-
- public MockLongValue(long value) {
- this.value.set(value);
- }
-
- @Override
- public boolean succeeded() {
- return true;
- }
-
- public void setValue(long value) {
- this.value.set(value);
- }
-
- @Override
- public Long preValue() {
- return value.get();
- }
-
- @Override
- public Long postValue() {
- return value.get();
- }
-
- @Override
- public AtomicStats getStats() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- }
-
- private class MockDirectoryCache implements DirectoryCache {
-
- /** The path this is caching and listening to */
- private Path path;
-
- public MockDirectoryCache(Path path) {
- this.path = path;
- }
-
- @Override
- public void start() {}
-
- @Override
- public void addListener(PathChildrenCacheListener listener) {
- listeners.add(path, listener);
- }
-
- @Override
- public List<ChildData> getCurrentData() {
- List<ChildData> childData = new ArrayList<>();
- for (String childName : getChildren(path)) {
- Path childPath = path.append(childName);
- childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get()));
- }
- return childData;
- }
-
- @Override
- public ChildData getCurrentData(Path fullPath) {
- if (!fullPath.getParentPath().equals(path)) {
- throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'");
- }
-
- return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null);
- }
-
- private void collectData(Node parent, Path parentPath, List<ChildData> data) {
- for (Node child : parent.children().values()) {
- Path childPath = parentPath.append(child.name());
- data.add(new ChildData("/" + childPath.toString(), null, child.getContent()));
- }
- }
-
- @Override
- public void close() {}
-
- }
-
- private class MockFileCache implements FileCache {
-
- /** The path this is caching and listening to */
- private Path path;
-
- public MockFileCache(Path path) {
- this.path = path;
- }
-
- @Override
- public void start() {}
-
- @Override
- public void addListener(NodeCacheListener listener) {
- listeners.add(path, listener);
- }
-
- @Override
- public ChildData getCurrentData() {
- Node node = fileSystem.root().getNode(Paths.get(path.toString()), false);
- if (node == null) return null;
- return new ChildData("/" + path.toString(), null, node.getContent());
- }
-
- @Override
- public void close() {}
-
- }
-
- // ----- The rest of this file is adapting the Curator (non-recipe) API to the -----
- // ----- file system methods above. -----
- // ----- There's nothing to see unless you are interested in an illustration of -----
- // ----- the folly of fluent API's or, more generally, mankind. -----
- private abstract static class MockProtectACLCreateModeStatPathAndBytesable<String>
- implements ProtectACLCreateModeStatPathAndBytesable<String> {
-
- public BackgroundPathAndBytesable<String> withACL(List<ACL> list) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public ProtectACLCreateModeStatPathAndBytesable<String> withMode(CreateMode createMode) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ACLCreateModeBackgroundPathAndBytesable<java.lang.String> withProtection() {
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground() {
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(Object o) {
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) {
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) {
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- return null;
- }
-
- @Override
- public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat) {
- return null;
- }
-
- }
-
- private class MockCreateBuilder implements CreateBuilder {
-
- private boolean createParents = false;
- private CreateMode createMode = CreateMode.PERSISTENT;
-
- @Override
- public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded() {
- createParents = true;
- return new MockProtectACLCreateModeStatPathAndBytesable<>() {
-
- @Override
- public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
- }
-
- @Override
- public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
- }
-
- };
- }
-
- @Override
- public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentContainersIfNeeded() {
- return new MockProtectACLCreateModeStatPathAndBytesable<>() {
-
- @Override
- public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
- }
-
- @Override
- public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
- }
-
- };
- }
-
- @Override
- @Deprecated
- public ACLPathAndBytesable<String> withProtectedEphemeralSequential() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ACLCreateModeStatBackgroundPathAndBytesable<String> withProtection() {
- return null;
- }
-
- public String forPath(String s) throws Exception {
- return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
- }
-
- public String forPath(String s, byte[] bytes) throws Exception {
- return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CreateBuilderMain withTtl(long l) {
- return null;
- }
-
- @Override
- public CreateBuilder2 orSetData() {
- return null;
- }
-
- @Override
- public CreateBuilder2 orSetData(int i) {
- return null;
- }
-
- @Override
- public CreateBackgroundModeStatACLable compressed() {
- return null;
- }
-
- @Override
- public CreateProtectACLCreateModePathAndBytesable<String> storingStatIn(Stat stat) {
- return null;
- }
-
- @Override
- public BackgroundPathAndBytesable<String> withACL(List<ACL> list) {
- return null;
- }
-
- @Override
- public ACLBackgroundPathAndBytesable<String> withMode(CreateMode createMode) {
- this.createMode = createMode;
- return this;
- }
-
- @Override
- public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) {
- return null;
- }
- }
-
- private static class MockBackgroundPathableBuilder<T> implements BackgroundPathable<T>, Watchable<BackgroundPathable<T>> {
-
- @Override
- public ErrorListenerPathable<T> inBackground() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public T forPath(String s) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathable<T> watched() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathable<T> usingWatcher(Watcher watcher) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathable<T> usingWatcher(CuratorWatcher curatorWatcher) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder<List<String>> implements GetChildrenBuilder {
-
- @Override
- public WatchPathable<List<String>> storingStatIn(Stat stat) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public List<String> forPath(String path) throws Exception {
- return getChildren(path, fileSystem.root());
- }
-
- }
-
- private class MockExistsBuilder extends MockBackgroundPathableBuilder<Stat> implements ExistsBuilder {
-
- @Override
- public Stat forPath(String path) throws Exception {
- try {
- Node node = getNode(path, fileSystem.root());
- Stat stat = new Stat();
- stat.setVersion(node.version());
- return stat;
- }
- catch (KeeperException.NoNodeException e) {
- return null;
- }
- }
-
- @Override
- public ACLableExistBuilderMain creatingParentsIfNeeded() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ACLableExistBuilderMain creatingParentContainersIfNeeded() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder {
-
- private boolean deleteChildren = false;
-
- @Override
- public BackgroundVersionable deletingChildrenIfNeeded() {
- deleteChildren = true;
- return this;
- }
-
- @Override
- public ChildrenDeletable guaranteed() {
- return this;
- }
-
- @Override
- public BackgroundPathable<Void> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public Void forPath(String pathString) throws Exception {
- deleteNode(pathString, deleteChildren, fileSystem.root(), listeners);
- return null;
- }
-
- @Override
- public DeleteBuilderMain quietly() {
- return this;
- }
- }
-
- private class MockGetDataBuilder extends MockBackgroundPathableBuilder<byte[]> implements GetDataBuilder {
-
- @Override
- public GetDataWatchBackgroundStatable decompressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- public byte[] forPath(String path) throws Exception {
- return getData(path, fileSystem.root());
- }
-
- @Override
- public ErrorListenerPathable<byte[]> inBackground() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<byte[]> inBackground(Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public WatchPathable<byte[]> storingStatIn(Stat stat) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- // extends MockBackgroundACLPathAndBytesableBuilder<Stat>
- private class MockSetDataBuilder implements SetDataBuilder {
-
- @Override
- public SetDataBackgroundVersionable compressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public BackgroundPathAndBytesable<Stat> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public Stat forPath(String path, byte[] bytes) throws Exception {
- setData(path, bytes, fileSystem.root(), listeners);
- return null;
- }
-
- @Override
- public Stat forPath(String s) throws Exception {
- return null;
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
- }
-
- /** Allows addition of directoryListeners which are never called */
- private class MockListenable<T> implements Listenable<T> {
-
- @Override
- public void addListener(T t) {
- }
-
- @Override
- public void addListener(T t, Executor executor) {
- }
-
- @Override
- public void removeListener(T t) {
- }
-
- }
-
- private class MockCuratorTransactionFinal implements CuratorTransactionFinal {
-
- /** The new directory root in which the transactional changes are made */
- private Node newRoot;
-
- private boolean committed = false;
-
- private final DelayedListener delayedListener = new DelayedListener();
-
- public MockCuratorTransactionFinal() {
- newRoot = fileSystem.root().clone();
- }
-
- @Override
- public Collection<CuratorTransactionResult> commit() throws Exception {
- fileSystem.replaceRoot(newRoot);
- committed = true;
- delayedListener.commit();
- return null; // TODO
- }
-
- @Override
- public TransactionCreateBuilder create() {
- ensureNotCommitted();
- return new MockTransactionCreateBuilder();
- }
-
- @Override
- public TransactionDeleteBuilder delete() {
- ensureNotCommitted();
- return new MockTransactionDeleteBuilder();
- }
-
- @Override
- public TransactionSetDataBuilder setData() {
- ensureNotCommitted();
- return new MockTransactionSetDataBuilder();
- }
-
- @Override
- public TransactionCheckBuilder check() {
- ensureNotCommitted();
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- private void ensureNotCommitted() {
- if (committed) throw new IllegalStateException("transaction already committed");
- }
-
- private class MockTransactionCreateBuilder implements TransactionCreateBuilder {
-
- private CreateMode createMode = CreateMode.PERSISTENT;
-
- @Override
- public ACLCreateModePathAndBytesable<CuratorTransactionBridge> compressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ACLPathAndBytesable<CuratorTransactionBridge> withMode(CreateMode createMode) {
- this.createMode = createMode;
- return this;
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- createNode(s, bytes, false, createMode, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s) throws Exception {
- createNode(s, new byte[0], false, createMode, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- @Override
- public TransactionCreateBuilder2 withTtl(long l) {
- return this;
- }
-
- @Override
- public Object withACL(List list, boolean b) {
- return this;
- }
-
- @Override
- public Object withACL(List list) {
- return this;
- }
- }
-
- private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder {
-
- @Override
- public Pathable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorTransactionBridge forPath(String path) throws Exception {
- deleteNode(path, false, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- }
-
- private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder {
-
- @Override
- public VersionPathAndBytesable<CuratorTransactionBridge> compressed() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
- MockCurator.this.setData(s, bytes, newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- @Override
- public CuratorTransactionBridge forPath(String s) throws Exception {
- MockCurator.this.setData(s, new byte[0], newRoot, delayedListener);
- return new MockCuratorTransactionBridge();
- }
-
- }
-
- private class MockCuratorTransactionBridge implements CuratorTransactionBridge {
-
- @Override
- public CuratorTransactionFinal and() {
- return MockCuratorTransactionFinal.this;
- }
-
- }
-
- /** A class which collects listen events and forwards them to the regular directoryListeners on commit */
- private class DelayedListener extends Listeners {
-
- private final List<Pair<Path, PathChildrenCacheEvent>> events = new ArrayList<>();
-
- @Override
- public void notify(Path path, PathChildrenCacheEvent event) {
- events.add(new Pair<>(path, event));
- }
-
- public void commit() {
- for (Pair<Path, PathChildrenCacheEvent> event : events)
- listeners.notify(event.getFirst(), event.getSecond());
- }
-
- }
-
- }
-
- private class MockCuratorFramework implements CuratorFramework {
-
- private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT;
-
- @Override
- public void start() {
- curatorState = CuratorFrameworkState.STARTED;
- }
-
- @Override
- public void close() {
- curatorState = CuratorFrameworkState.STOPPED;
- }
-
- @Override
- public CuratorFrameworkState getState() {
- return curatorState;
- }
-
- @Override
- @Deprecated
- public boolean isStarted() {
- return curatorState == CuratorFrameworkState.STARTED;
- }
-
- @Override
- public CreateBuilder create() {
- return new MockCreateBuilder();
- }
-
- @Override
- public DeleteBuilder delete() {
- return new MockDeleteBuilder();
- }
-
- @Override
- public ExistsBuilder checkExists() {
- return new MockExistsBuilder();
- }
-
- @Override
- public GetDataBuilder getData() {
- return new MockGetDataBuilder();
- }
-
- @Override
- public SetDataBuilder setData() {
- return new MockSetDataBuilder();
- }
-
- @Override
- public GetChildrenBuilder getChildren() {
- return new MockGetChildrenBuilder();
- }
-
- @Override
- public GetACLBuilder getACL() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public SetACLBuilder setACL() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public ReconfigBuilder reconfig() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public GetConfigBuilder getConfig() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorTransaction inTransaction() {
- return new MockCuratorTransactionFinal();
- }
-
- @Override
- public CuratorMultiTransaction transaction() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public TransactionOp transactionOp() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- @Deprecated
- public void sync(String path, Object backgroundContextObject) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public void createContainers(String s) throws Exception {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public Listenable<ConnectionStateListener> getConnectionStateListenable() {
- return new MockListenable<>();
- }
-
- @Override
- public Listenable<CuratorListener> getCuratorListenable() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- @Deprecated
- public CuratorFramework nonNamespaceView() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorFramework usingNamespace(String newNamespace) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public String getNamespace() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public CuratorZookeeperClient getZookeeperClient() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Deprecated
- @Override
- public EnsurePath newNamespaceAwareEnsurePath(String path) {
- return new EnsurePath(path);
- }
-
- @Override
- public void clearWatcherReferences(Watcher watcher) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public boolean blockUntilConnected(int i, TimeUnit timeUnit) throws InterruptedException {
- return true;
- }
-
- @Override
- public void blockUntilConnected() throws InterruptedException {
-
- }
-
- @Override
- public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() {
- return new WatcherRemoveCuratorFramework() {
- @Override
- public void removeWatchers() {
-
- }
-
- @Override
- public void start() {
-
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public CuratorFrameworkState getState() {
- return null;
- }
-
- @Override
- public boolean isStarted() {
- return false;
- }
-
- @Override
- public CreateBuilder create() {
- return null;
- }
-
- @Override
- public DeleteBuilder delete() {
- return null;
- }
-
- @Override
- public ExistsBuilder checkExists() {
- return null;
- }
-
- @Override
- public GetDataBuilder getData() {
- return null;
- }
-
- @Override
- public SetDataBuilder setData() {
- return null;
- }
-
- @Override
- public GetChildrenBuilder getChildren() {
- return null;
- }
-
- @Override
- public GetACLBuilder getACL() {
- return null;
- }
-
- @Override
- public SetACLBuilder setACL() {
- return null;
- }
-
- @Override
- public ReconfigBuilder reconfig() {
- return null;
- }
-
- @Override
- public GetConfigBuilder getConfig() {
- return null;
- }
-
- @Override
- public CuratorTransaction inTransaction() {
- return null;
- }
-
- @Override
- public CuratorMultiTransaction transaction() {
- return null;
- }
-
- @Override
- public TransactionOp transactionOp() {
- return null;
- }
-
- @Override
- public void sync(String s, Object o) {
-
- }
-
- @Override
- public void createContainers(String s) throws Exception {
-
- }
-
- @Override
- public SyncBuilder sync() {
- return null;
- }
-
- @Override
- public RemoveWatchesBuilder watches() {
- return null;
- }
-
- @Override
- public Listenable<ConnectionStateListener> getConnectionStateListenable() {
- return null;
- }
-
- @Override
- public Listenable<CuratorListener> getCuratorListenable() {
- return null;
- }
-
- @Override
- public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() {
- return null;
- }
-
- @Override
- public CuratorFramework nonNamespaceView() {
- return null;
- }
-
- @Override
- public CuratorFramework usingNamespace(String s) {
- return null;
- }
-
- @Override
- public String getNamespace() {
- return null;
- }
-
- @Override
- public CuratorZookeeperClient getZookeeperClient() {
- return null;
- }
-
- @Override
- public EnsurePath newNamespaceAwareEnsurePath(String s) {
- return null;
- }
-
- @Override
- public void clearWatcherReferences(Watcher watcher) {
-
- }
-
- @Override
- public boolean blockUntilConnected(int i, TimeUnit timeUnit) throws InterruptedException {
- return false;
- }
-
- @Override
- public void blockUntilConnected() throws InterruptedException {
-
- }
-
- @Override
- public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() {
- return null;
- }
-
- @Override
- public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() {
- return null;
- }
-
- @Override
- public QuorumVerifier getCurrentConfig() {
- return null;
- }
-
- @Override
- public SchemaSet getSchemaSet() {
- return null;
- }
-
- @Override
- public boolean isZk34CompatibilityMode() {
- return false;
- }
-
- @Override
- public CompletableFuture<Void> runSafe(Runnable runnable) {
- return null;
- }
- };
-
- }
-
- @Override
- public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public QuorumVerifier getCurrentConfig() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public SchemaSet getSchemaSet() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public boolean isZk34CompatibilityMode() {
- return false;
- }
-
- @Override
- public CompletableFuture<Void> runSafe(Runnable runnable) {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public SyncBuilder sync() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- @Override
- public RemoveWatchesBuilder watches() {
- throw new UnsupportedOperationException("Not implemented in MockCurator");
- }
-
- }
-
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
new file mode 100644
index 00000000000..401c6e90cd8
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java
@@ -0,0 +1,1356 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator.mock;
+
+import com.google.common.util.concurrent.UncheckedTimeoutException;
+import com.yahoo.collections.Pair;
+import com.yahoo.concurrent.Lock;
+import com.yahoo.concurrent.Locks;
+import com.yahoo.path.Path;
+import com.yahoo.vespa.curator.CompletionTimeoutException;
+import com.yahoo.vespa.curator.Curator;
+import com.yahoo.vespa.curator.recipes.CuratorLockException;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.ACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.ACLCreateModeStatBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.ACLPathAndBytesable;
+import org.apache.curator.framework.api.ACLableExistBuilderMain;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundPathAndBytesable;
+import org.apache.curator.framework.api.BackgroundPathable;
+import org.apache.curator.framework.api.BackgroundVersionable;
+import org.apache.curator.framework.api.ChildrenDeletable;
+import org.apache.curator.framework.api.CreateBackgroundModeStatACLable;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateBuilder2;
+import org.apache.curator.framework.api.CreateBuilderMain;
+import org.apache.curator.framework.api.CreateProtectACLCreateModePathAndBytesable;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.DeleteBuilderMain;
+import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.GetACLBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetConfigBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
+import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
+import org.apache.curator.framework.api.ReconfigBuilder;
+import org.apache.curator.framework.api.RemoveWatchesBuilder;
+import org.apache.curator.framework.api.SetACLBuilder;
+import org.apache.curator.framework.api.SetDataBackgroundVersionable;
+import org.apache.curator.framework.api.SetDataBuilder;
+import org.apache.curator.framework.api.SyncBuilder;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.api.VersionPathAndBytesable;
+import org.apache.curator.framework.api.WatchPathable;
+import org.apache.curator.framework.api.Watchable;
+import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransaction;
+import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
+import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
+import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
+import org.apache.curator.framework.api.transaction.TransactionCreateBuilder2;
+import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
+import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.recipes.atomic.AtomicStats;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.framework.schema.SchemaSet;
+import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryForever;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A mock implementation of{@link CuratorFramework} for testing purposes.
+ *
+ * @author mpolden
+ */
+public class MockCuratorFramework implements CuratorFramework {
+
+ private final boolean shouldTimeoutOnEnter;
+ private final boolean stableOrdering;
+ private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS);
+
+ /** The file system used by this mock to store zookeeper files and directories */
+ private final MemoryFileSystem fileSystem = new MemoryFileSystem();
+
+ /** Atomic counters. A more accurate mock would store these as files in the file system */
+ private final Map<String, MockAtomicCounter> atomicCounters = new ConcurrentHashMap<>();
+
+ /** Listeners to changes to a particular path */
+ private final ListenerMap listeners = new ListenerMap();
+
+ private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT;
+ private int monotonicallyIncreasingNumber = 0;
+
+ public MockCuratorFramework(boolean stableOrdering, boolean shouldTimeoutOnEnter) {
+ this.stableOrdering = stableOrdering;
+ this.shouldTimeoutOnEnter = shouldTimeoutOnEnter;
+ }
+
+ public Map<String, MockAtomicCounter> atomicCounters() {
+ return Collections.unmodifiableMap(atomicCounters);
+ }
+
+ public MemoryFileSystem fileSystem() {
+ return fileSystem;
+ }
+
+ @Override
+ public void start() {
+ curatorState = CuratorFrameworkState.STARTED;
+ }
+
+ @Override
+ public void close() {
+ curatorState = CuratorFrameworkState.STOPPED;
+ }
+
+ @Override
+ public CuratorFrameworkState getState() {
+ return curatorState;
+ }
+
+ @Override
+ @Deprecated
+ public boolean isStarted() {
+ return curatorState == CuratorFrameworkState.STARTED;
+ }
+
+ @Override
+ public CreateBuilder create() {
+ return new MockCreateBuilder();
+ }
+
+ @Override
+ public DeleteBuilder delete() {
+ return new MockDeleteBuilder();
+ }
+
+ @Override
+ public ExistsBuilder checkExists() {
+ return new MockExistsBuilder();
+ }
+
+ @Override
+ public GetDataBuilder getData() {
+ return new MockGetDataBuilder();
+ }
+
+ @Override
+ public SetDataBuilder setData() {
+ return new MockSetDataBuilder();
+ }
+
+ @Override
+ public GetChildrenBuilder getChildren() {
+ return new MockGetChildrenBuilder();
+ }
+
+ @Override
+ public GetACLBuilder getACL() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public SetACLBuilder setACL() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+
+ @Override
+ public ReconfigBuilder reconfig() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public GetConfigBuilder getConfig() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public CuratorTransaction inTransaction() {
+ return new MockCuratorTransactionFinal();
+ }
+
+ @Override
+ public CuratorMultiTransaction transaction() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public TransactionOp transactionOp() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public RemoveWatchesBuilder watches() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public QuorumVerifier getCurrentConfig() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public SchemaSet getSchemaSet() { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ public boolean isZk34CompatibilityMode() { return false; }
+
+ @Override
+ public CompletableFuture<Void> runSafe(Runnable runnable) { throw new UnsupportedOperationException("Not implemented in MockCurator"); }
+
+ @Override
+ @Deprecated
+ public void sync(String path, Object backgroundContextObject) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public void createContainers(String s) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public Listenable<ConnectionStateListener> getConnectionStateListenable() {
+ return new MockListenable<>();
+ }
+
+ @Override
+ public Listenable<CuratorListener> getCuratorListenable() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ @Deprecated
+ public CuratorFramework nonNamespaceView() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorFramework usingNamespace(String newNamespace) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public String getNamespace() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorZookeeperClient getZookeeperClient() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Deprecated
+ @Override
+ public EnsurePath newNamespaceAwareEnsurePath(String path) {
+ return new EnsurePath(path);
+ }
+
+ @Override
+ public void clearWatcherReferences(Watcher watcher) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public boolean blockUntilConnected(int i, TimeUnit timeUnit) {
+ return true;
+ }
+
+ @Override
+ public void blockUntilConnected() {
+ }
+
+ @Override
+ public SyncBuilder sync() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ // ----- Factory methods for mocks */
+
+ public InterProcessLock createMutex(String path) {
+ return new MockLock(path);
+ }
+
+ public MockAtomicCounter createAtomicCounter(String path) {
+ return atomicCounters.computeIfAbsent(path, (k) -> new MockAtomicCounter(path));
+ }
+
+ public Curator.CompletionWaiter createCompletionWaiter() {
+ return new MockCompletionWaiter();
+ }
+
+ public Curator.DirectoryCache createDirectoryCache(String path) {
+ return new MockDirectoryCache(Path.fromString(path));
+ }
+
+ public Curator.FileCache createFileCache(String path) {
+ return new MockFileCache(Path.fromString(path));
+ }
+
+ // ----- Start of adaptor methods from Curator to the mock file system -----
+
+ /** Creates a node below the given directory root */
+ private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException.NodeExistsException, KeeperException.NoNodeException {
+ validatePath(pathString);
+ Path path = Path.fromString(pathString);
+ if (path.isRoot()) return "/"; // the root already exists
+ MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents);
+ String name = nodeName(path.getName(), createMode);
+
+ if (parent == null)
+ throw new KeeperException.NoNodeException(path.getParentPath().toString());
+ if (parent.children().containsKey(path.getName()))
+ throw new KeeperException.NodeExistsException(path.toString());
+
+ parent.add(name).setContent(content);
+ String nodePath = "/" + path.getParentPath().toString() + "/" + name;
+ listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED);
+ return nodePath;
+ }
+
+ /** Deletes a node below the given directory root */
+ private void deleteNode(String pathString, boolean deleteChildren, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException.NoNodeException, KeeperException.NotEmptyException {
+ validatePath(pathString);
+ Path path = Path.fromString(pathString);
+ MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
+ if (parent == null) throw new KeeperException.NoNodeException(path.toString());
+ MemoryFileSystem.Node node = parent.children().get(path.getName());
+ if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent);
+ if ( ! node.children().isEmpty() && ! deleteChildren)
+ throw new KeeperException.NotEmptyException(path.toString());
+ parent.remove(path.getName());
+ listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
+
+ /** Returns the data of a node */
+ private byte[] getData(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ validatePath(pathString);
+ return getNode(pathString, root).getContent();
+ }
+
+ /** sets the data of an existing node */
+ private void setData(String pathString, byte[] content, MemoryFileSystem.Node root, Listeners listeners)
+ throws KeeperException.NoNodeException {
+ validatePath(pathString);
+ getNode(pathString, root).setContent(content);
+ listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ }
+
+ private List<String> getChildren(String path, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ validatePath(path);
+ MemoryFileSystem.Node node = root.getNode(Paths.get(path), false);
+ if (node == null) throw new KeeperException.NoNodeException(path);
+ List<String> children = new ArrayList<>(node.children().keySet());
+ if (! stableOrdering)
+ Collections.shuffle(children);
+ return children;
+ }
+
+ /** Returns a node or throws the appropriate exception if it doesn't exist */
+ private MemoryFileSystem.Node getNode(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException {
+ validatePath(pathString);
+ Path path = Path.fromString(pathString);
+ MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false);
+ if (parent == null) throw new KeeperException.NoNodeException(path.toString());
+ MemoryFileSystem.Node node = parent.children().get(path.getName());
+ if (node == null) throw new KeeperException.NoNodeException(path.toString());
+ return node;
+ }
+
+ private String nodeName(String baseName, CreateMode createMode) {
+ switch (createMode) {
+ case PERSISTENT: case EPHEMERAL: return baseName;
+ case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++;
+ default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator");
+ }
+ }
+
+ /** Validates a path using the same rules as ZooKeeper */
+ public static String validatePath(String path) throws IllegalArgumentException {
+ if (path == null) throw new IllegalArgumentException("Path cannot be null");
+ if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0");
+ if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character");
+ if (path.length() == 1) return path; // done checking - it's the root
+ if (path.charAt(path.length() - 1) == '/')
+ throw new IllegalArgumentException("Path must not end with / character");
+
+ String reason = null;
+ char lastc = '/';
+ char[] chars = path.toCharArray();
+ char c;
+ for (int i = 1; i < chars.length; lastc = chars[i], i++) {
+ c = chars[i];
+
+ if (c == 0) {
+ reason = "null character not allowed @" + i;
+ break;
+ } else if (c == '/' && lastc == '/') {
+ reason = "empty node name specified @" + i;
+ break;
+ } else if (c == '.' && lastc == '.') {
+ if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
+ reason = "relative paths not allowed @" + i;
+ break;
+ }
+ } else if (c == '.') {
+ if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) {
+ reason = "relative paths not allowed @" + i;
+ break;
+ }
+ } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F'
+ || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') {
+ reason = "invalid charater @" + i;
+ break;
+ }
+ }
+
+ if (reason != null)
+ throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason);
+ return path;
+ }
+
+ /**
+ * Invocation of changes to the file system state is abstracted through this to allow transactional
+ * changes to notify on commit
+ */
+ private abstract static class Listeners {
+
+ /** Translating method */
+ public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) {
+ String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/
+ PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data));
+ notify(path, event);
+ }
+
+ public abstract void notify(Path path, PathChildrenCacheEvent event);
+
+ }
+
+ /** The regular listener implementation which notifies registered file and directory listeners */
+ private class ListenerMap extends Listeners {
+
+ private final Map<Path, PathChildrenCacheListener> directoryListeners = new ConcurrentHashMap<>();
+ private final Map<Path, NodeCacheListener> fileListeners = new ConcurrentHashMap<>();
+
+ public void add(Path path, PathChildrenCacheListener listener) {
+ directoryListeners.put(path, listener);
+ }
+
+ public void add(Path path, NodeCacheListener listener) {
+ fileListeners.put(path, listener);
+ }
+
+ @Override
+ public void notify(Path path, PathChildrenCacheEvent event) {
+ try {
+ // Snapshot directoryListeners in case notification leads to new directoryListeners added
+ Set<Map.Entry<Path, PathChildrenCacheListener>> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet());
+ for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryListenerSnapshot) {
+ if (path.isChildOf(listener.getKey()))
+ listener.getValue().childEvent(MockCuratorFramework.this, event);
+ }
+
+ // Snapshot directoryListeners in case notification leads to new directoryListeners added
+ Set<Map.Entry<Path, NodeCacheListener>> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet());
+ for (Map.Entry<Path, NodeCacheListener> listener : fileListenerSnapshot) {
+ if (path.equals(listener.getKey()))
+ listener.getValue().nodeChanged();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace(); // TODO: Remove
+ throw new RuntimeException("Exception notifying listeners", e);
+ }
+ }
+
+ }
+
+ private class MockCompletionWaiter implements Curator.CompletionWaiter {
+
+ @Override
+ public void awaitCompletion(Duration timeout) {
+ if (shouldTimeoutOnEnter) {
+ throw new CompletionTimeoutException("");
+ }
+ }
+
+ @Override
+ public void notifyCompletion() {
+ }
+
+ }
+
+ /** A lock which works inside a single vm */
+ private class MockLock extends InterProcessSemaphoreMutex {
+
+ public boolean timeoutOnLock = false;
+ public boolean throwExceptionOnLock = false;
+
+ private final String path;
+
+ private Lock lock = null;
+
+ public MockLock(String path) {
+ super(MockCuratorFramework.this, path);
+ this.path = path;
+ }
+
+ @Override
+ public boolean acquire(long timeout, TimeUnit unit) {
+ if (throwExceptionOnLock)
+ throw new CuratorLockException("Thrown by mock");
+ if (timeoutOnLock) return false;
+
+ try {
+ lock = locks.lock(path, timeout, unit);
+ return true;
+ }
+ catch (UncheckedTimeoutException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void acquire() {
+ if (throwExceptionOnLock)
+ throw new CuratorLockException("Thrown by mock");
+
+ lock = locks.lock(path);
+ }
+
+ @Override
+ public void release() {
+ if (lock != null)
+ lock.close();
+ }
+
+ }
+
+ private class MockAtomicCounter extends DistributedAtomicLong {
+
+ private boolean initialized = false;
+ private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/
+
+ public MockAtomicCounter(String path) {
+ super(MockCuratorFramework.this, path, new RetryForever(1_000));
+ }
+
+ @Override
+ public boolean initialize(Long value) {
+ if (initialized) return false;
+ this.value = new MockLongValue(value);
+ initialized = true;
+ return true;
+ }
+
+ @Override
+ public AtomicValue<Long> get() {
+ if (value == null) return new MockLongValue(0);
+ return value;
+ }
+
+ public AtomicValue<Long> add(Long delta) {
+ return trySet(value.postValue() + delta);
+ }
+
+ public AtomicValue<Long> subtract(Long delta) {
+ return trySet(value.postValue() - delta);
+ }
+
+ @Override
+ public AtomicValue<Long> increment() {
+ return trySet(value.postValue() + 1);
+ }
+
+ public AtomicValue<Long> decrement() {
+ return trySet(value.postValue() - 1);
+ }
+
+ @Override
+ public AtomicValue<Long> trySet(Long longval) {
+ value = new MockLongValue(longval);
+ return value;
+ }
+
+ public void forceSet(Long newValue) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public AtomicValue<Long> compareAndSet(Long expectedValue, Long newValue) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ }
+
+ private static class MockLongValue implements AtomicValue<Long> {
+
+ private final AtomicLong value = new AtomicLong();
+
+ public MockLongValue(long value) {
+ this.value.set(value);
+ }
+
+ @Override
+ public boolean succeeded() {
+ return true;
+ }
+
+ public void setValue(long value) {
+ this.value.set(value);
+ }
+
+ @Override
+ public Long preValue() {
+ return value.get();
+ }
+
+ @Override
+ public Long postValue() {
+ return value.get();
+ }
+
+ @Override
+ public AtomicStats getStats() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ }
+
+ private class MockDirectoryCache implements Curator.DirectoryCache {
+
+ /** The path this is caching and listening to */
+ private final Path path;
+
+ public MockDirectoryCache(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void addListener(PathChildrenCacheListener listener) {
+ listeners.add(path, listener);
+ }
+
+ @Override
+ public List<ChildData> getCurrentData() {
+ List<ChildData> childData = new ArrayList<>();
+ for (String childName : getChildren(path)) {
+ Path childPath = path.append(childName);
+ childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get()));
+ }
+ return childData;
+ }
+
+ @Override
+ public ChildData getCurrentData(Path fullPath) {
+ if (!fullPath.getParentPath().equals(path)) {
+ throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'");
+ }
+
+ return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null);
+ }
+
+ @Override
+ public void close() {}
+
+ private List<String> getChildren(Path path) {
+ try {
+ return MockCuratorFramework.this.getChildren().forPath(path.getAbsolute());
+ } catch (KeeperException.NoNodeException e) {
+ return List.of();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not get children of " + path.getAbsolute(), e);
+ }
+ }
+
+ private Optional<byte[]> getData(Path path) {
+ try {
+ return Optional.of(MockCuratorFramework.this.getData().forPath(path.getAbsolute()));
+ }
+ catch (KeeperException.NoNodeException e) {
+ return Optional.empty();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not get data at " + path.getAbsolute(), e);
+ }
+ }
+
+ }
+
+ private class MockFileCache implements Curator.FileCache {
+
+ /** The path this is caching and listening to */
+ private final Path path;
+
+ public MockFileCache(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void addListener(NodeCacheListener listener) {
+ listeners.add(path, listener);
+ }
+
+ @Override
+ public ChildData getCurrentData() {
+ MemoryFileSystem.Node node = fileSystem.root().getNode(Paths.get(path.toString()), false);
+ if (node == null) return null;
+ return new ChildData("/" + path.toString(), null, node.getContent());
+ }
+
+ @Override
+ public void close() {}
+
+ }
+
+
+ // ----- The rest of this file is adapting the Curator (non-recipe) API to the -----
+ // ----- file system methods above. -----
+ // ----- There's nothing to see unless you are interested in an illustration of -----
+ // ----- the folly of fluent API's or, more generally, mankind. -----
+ private abstract static class MockProtectACLCreateModeStatPathAndBytesable<String>
+ implements ProtectACLCreateModeStatPathAndBytesable<String> {
+
+ public BackgroundPathAndBytesable<String> withACL(List<ACL> list) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public ProtectACLCreateModeStatPathAndBytesable<String> withMode(CreateMode createMode) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ACLCreateModeBackgroundPathAndBytesable<java.lang.String> withProtection() {
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground() {
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(Object o) {
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) {
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ return null;
+ }
+
+ @Override
+ public ACLBackgroundPathAndBytesable<String> storingStatIn(Stat stat) {
+ return null;
+ }
+
+ }
+
+ private class MockCreateBuilder implements CreateBuilder {
+
+ private boolean createParents = false;
+ private CreateMode createMode = CreateMode.PERSISTENT;
+
+ @Override
+ public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded() {
+ createParents = true;
+ return new MockProtectACLCreateModeStatPathAndBytesable<>() {
+
+ @Override
+ public String forPath(String s, byte[] bytes) throws Exception {
+ return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ @Override
+ public String forPath(String s) throws Exception {
+ return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ };
+ }
+
+ @Override
+ public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentContainersIfNeeded() {
+ return new MockProtectACLCreateModeStatPathAndBytesable<>() {
+
+ @Override
+ public String forPath(String s, byte[] bytes) throws Exception {
+ return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ @Override
+ public String forPath(String s) throws Exception {
+ return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ };
+ }
+
+ @Override
+ @Deprecated
+ public ACLPathAndBytesable<String> withProtectedEphemeralSequential() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ACLCreateModeStatBackgroundPathAndBytesable<String> withProtection() {
+ return null;
+ }
+
+ public String forPath(String s) throws Exception {
+ return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ public String forPath(String s, byte[] bytes) throws Exception {
+ return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners);
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<String> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CreateBuilderMain withTtl(long l) {
+ return null;
+ }
+
+ @Override
+ public CreateBuilder2 orSetData() {
+ return null;
+ }
+
+ @Override
+ public CreateBuilder2 orSetData(int i) {
+ return null;
+ }
+
+ @Override
+ public CreateBackgroundModeStatACLable compressed() {
+ return null;
+ }
+
+ @Override
+ public CreateProtectACLCreateModePathAndBytesable<String> storingStatIn(Stat stat) {
+ return null;
+ }
+
+ @Override
+ public BackgroundPathAndBytesable<String> withACL(List<ACL> list) {
+ return null;
+ }
+
+ @Override
+ public ACLBackgroundPathAndBytesable<String> withMode(CreateMode createMode) {
+ this.createMode = createMode;
+ return this;
+ }
+
+ @Override
+ public BackgroundPathAndBytesable<String> withACL(List<ACL> list, boolean b) {
+ return null;
+ }
+ }
+
+ private static class MockBackgroundPathableBuilder<T> implements BackgroundPathable<T>, Watchable<BackgroundPathable<T>> {
+
+ @Override
+ public ErrorListenerPathable<T> inBackground() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<T> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public T forPath(String s) throws Exception {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathable<T> watched() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathable<T> usingWatcher(Watcher watcher) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathable<T> usingWatcher(CuratorWatcher curatorWatcher) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder<List<String>> implements GetChildrenBuilder {
+
+ @Override
+ public WatchPathable<List<String>> storingStatIn(Stat stat) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public List<String> forPath(String path) throws Exception {
+ return getChildren(path, fileSystem.root());
+ }
+
+ }
+
+ private class MockExistsBuilder extends MockBackgroundPathableBuilder<Stat> implements ExistsBuilder {
+
+ @Override
+ public Stat forPath(String path) throws Exception {
+ try {
+ MemoryFileSystem.Node node = getNode(path, fileSystem.root());
+ Stat stat = new Stat();
+ stat.setVersion(node.version());
+ return stat;
+ }
+ catch (KeeperException.NoNodeException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public ACLableExistBuilderMain creatingParentsIfNeeded() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ACLableExistBuilderMain creatingParentContainersIfNeeded() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ private class MockDeleteBuilder extends MockBackgroundPathableBuilder<Void> implements DeleteBuilder {
+
+ private boolean deleteChildren = false;
+
+ @Override
+ public BackgroundVersionable deletingChildrenIfNeeded() {
+ deleteChildren = true;
+ return this;
+ }
+
+ @Override
+ public ChildrenDeletable guaranteed() {
+ return this;
+ }
+
+ @Override
+ public BackgroundPathable<Void> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public Void forPath(String pathString) throws Exception {
+ deleteNode(pathString, deleteChildren, fileSystem.root(), listeners);
+ return null;
+ }
+
+ @Override
+ public DeleteBuilderMain quietly() {
+ return this;
+ }
+ }
+
+ private class MockGetDataBuilder extends MockBackgroundPathableBuilder<byte[]> implements GetDataBuilder {
+
+ @Override
+ public GetDataWatchBackgroundStatable decompressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ public byte[] forPath(String path) throws Exception {
+ return getData(path, fileSystem.root());
+ }
+
+ @Override
+ public ErrorListenerPathable<byte[]> inBackground() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<byte[]> inBackground(Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathable<byte[]> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public WatchPathable<byte[]> storingStatIn(Stat stat) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ // extends MockBackgroundACLPathAndBytesableBuilder<Stat>
+ private class MockSetDataBuilder implements SetDataBuilder {
+
+ @Override
+ public SetDataBackgroundVersionable compressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public BackgroundPathAndBytesable<Stat> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public Stat forPath(String path, byte[] bytes) throws Exception {
+ setData(path, bytes, fileSystem.root(), listeners);
+ return null;
+ }
+
+ @Override
+ public Stat forPath(String s) throws Exception {
+ return null;
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ErrorListenerPathAndBytesable<Stat> inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+ }
+
+ /** Allows addition of directoryListeners which are never called */
+ private class MockListenable<T> implements Listenable<T> {
+
+ @Override
+ public void addListener(T t) {
+ }
+
+ @Override
+ public void addListener(T t, Executor executor) {
+ }
+
+ @Override
+ public void removeListener(T t) {
+ }
+
+ }
+
+ private class MockCuratorTransactionFinal implements CuratorTransactionFinal {
+
+ /** The new directory root in which the transactional changes are made */
+ private MemoryFileSystem.Node newRoot;
+
+ private boolean committed = false;
+
+ private final DelayedListener delayedListener = new DelayedListener();
+
+ public MockCuratorTransactionFinal() {
+ newRoot = fileSystem.root().clone();
+ }
+
+ @Override
+ public Collection<CuratorTransactionResult> commit() throws Exception {
+ fileSystem.replaceRoot(newRoot);
+ committed = true;
+ delayedListener.commit();
+ return null; // TODO
+ }
+
+ @Override
+ public TransactionCreateBuilder create() {
+ ensureNotCommitted();
+ return new MockTransactionCreateBuilder();
+ }
+
+ @Override
+ public TransactionDeleteBuilder delete() {
+ ensureNotCommitted();
+ return new MockTransactionDeleteBuilder();
+ }
+
+ @Override
+ public TransactionSetDataBuilder setData() {
+ ensureNotCommitted();
+ return new MockTransactionSetDataBuilder();
+ }
+
+ @Override
+ public TransactionCheckBuilder check() {
+ ensureNotCommitted();
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ private void ensureNotCommitted() {
+ if (committed) throw new IllegalStateException("transaction already committed");
+ }
+
+ private class MockTransactionCreateBuilder implements TransactionCreateBuilder {
+
+ private CreateMode createMode = CreateMode.PERSISTENT;
+
+ @Override
+ public ACLCreateModePathAndBytesable<CuratorTransactionBridge> compressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public ACLPathAndBytesable<CuratorTransactionBridge> withMode(CreateMode createMode) {
+ this.createMode = createMode;
+ return this;
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
+ createNode(s, bytes, false, createMode, newRoot, delayedListener);
+ return new MockCuratorTransactionBridge();
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s) throws Exception {
+ createNode(s, new byte[0], false, createMode, newRoot, delayedListener);
+ return new MockCuratorTransactionBridge();
+ }
+
+ @Override
+ public TransactionCreateBuilder2 withTtl(long l) {
+ return this;
+ }
+
+ @Override
+ public Object withACL(List list, boolean b) {
+ return this;
+ }
+
+ @Override
+ public Object withACL(List list) {
+ return this;
+ }
+ }
+
+ private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder {
+
+ @Override
+ public Pathable<CuratorTransactionBridge> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String path) throws Exception {
+ deleteNode(path, false, newRoot, delayedListener);
+ return new MockCuratorTransactionBridge();
+ }
+
+ }
+
+ private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder {
+
+ @Override
+ public VersionPathAndBytesable<CuratorTransactionBridge> compressed() {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public PathAndBytesable<CuratorTransactionBridge> withVersion(int i) {
+ throw new UnsupportedOperationException("Not implemented in MockCurator");
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception {
+ MockCuratorFramework.this.setData(s, bytes, newRoot, delayedListener);
+ return new MockCuratorTransactionBridge();
+ }
+
+ @Override
+ public CuratorTransactionBridge forPath(String s) throws Exception {
+ MockCuratorFramework.this.setData(s, new byte[0], newRoot, delayedListener);
+ return new MockCuratorTransactionBridge();
+ }
+
+ }
+
+ private class MockCuratorTransactionBridge implements CuratorTransactionBridge {
+
+ @Override
+ public CuratorTransactionFinal and() {
+ return MockCuratorTransactionFinal.this;
+ }
+
+ }
+
+ /** A class which collects listen events and forwards them to the regular directoryListeners on commit */
+ private class DelayedListener extends Listeners {
+
+ private final List<Pair<Path, PathChildrenCacheEvent>> events = new ArrayList<>();
+
+ @Override
+ public void notify(Path path, PathChildrenCacheEvent event) {
+ events.add(new Pair<>(path, event));
+ }
+
+ public void commit() {
+ for (Pair<Path, PathChildrenCacheEvent> event : events)
+ listeners.notify(event.getFirst(), event.getSecond());
+ }
+
+ }
+
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java
index 07f0924cb31..777da5988eb 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/package-info.java
@@ -1,6 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
@ExportPackage
-@PublicApi
+@PublicApi // TODO: Revoke this on Vespa 8.
package com.yahoo.vespa.curator;
import com.yahoo.api.annotations.PublicApi;
import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java
new file mode 100644
index 00000000000..a518d8df843
--- /dev/null
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java
@@ -0,0 +1,74 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator;
+
+import com.yahoo.net.HostName;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author mpolden
+ */
+public class ConnectionSpecTest {
+
+ @Test
+ public void create() {
+ HostName.setHostNameForTestingOnly("host2");
+ Config config = new Config(List.of(new Config.Server("host1", 10001),
+ new Config.Server("host2", 10002),
+ new Config.Server("host3", 10003)));
+
+ {
+ ConnectionSpec spec = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, false);
+ assertEquals("host1:10001,host2:10002,host3:10003", spec.local());
+ assertEquals("host1:10001,host2:10002,host3:10003", spec.ensemble());
+ assertEquals(3, spec.ensembleSize());
+ }
+
+ {
+ ConnectionSpec specLocalAffinity = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, true);
+ assertEquals("host2:10002", specLocalAffinity.local());
+ assertEquals("host1:10001,host2:10002,host3:10003", specLocalAffinity.ensemble());
+ assertEquals(3, specLocalAffinity.ensembleSize());
+ }
+
+ {
+ ConnectionSpec specFromString = ConnectionSpec.create("host1:10001", "host1:10001,host2:10002");
+ assertEquals("host1:10001", specFromString.local());
+ assertEquals("host1:10001,host2:10002", specFromString.ensemble());
+ assertEquals(2, specFromString.ensembleSize());
+ }
+ }
+
+ private static class Config {
+
+ private final List<Server> servers;
+
+ public Config(List<Server> servers) {
+ this.servers = servers;
+ }
+
+ private static class Server {
+
+ private final String hostname;
+ private final int port;
+
+ public Server(String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ public String hostname() {
+ return hostname;
+ }
+
+ public int port() {
+ return port;
+ }
+ }
+
+ }
+
+}
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
index 2bf40c4e2bb..5341efaefe5 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator;
-import com.yahoo.cloud.config.ConfigserverConfig;
+import com.yahoo.cloud.config.CuratorConfig;
import com.yahoo.net.HostName;
import org.apache.curator.test.TestingServer;
import org.junit.After;
@@ -61,45 +61,33 @@ public class CuratorTest {
@Test
public void require_that_server_count_is_correct() {
- ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
- builder.zookeeperserver(createZKBuilder(localhost, port1));
- try (Curator curator = createCurator(new ConfigserverConfig(builder))) {
+ CuratorConfig.Builder builder = new CuratorConfig.Builder();
+ builder.server(createZKBuilder(localhost, port1));
+ try (Curator curator = createCurator(new CuratorConfig(builder))) {
assertEquals(1, curator.zooKeeperEnsembleCount());
}
}
- @Test
- public void localhost_affinity() {
- String localhostHostName = "myhost";
- int localhostPort = 123;
-
- ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
- builder.zookeeperserver(createZKBuilder(localhostHostName, localhostPort));
- builder.zookeeperserver(createZKBuilder("otherhost", 345));
- ConfigserverConfig config = new ConfigserverConfig(builder);
-
- HostName.setHostNameForTestingOnly(localhostHostName);
-
- String localhostSpec = localhostHostName + ":" + localhostPort;
- assertEquals(localhostSpec, Curator.createConnectionSpecForLocalhost(config));
- }
-
- private ConfigserverConfig createTestConfig() {
- ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
- builder.zookeeperserver(createZKBuilder(localhost, port1));
- builder.zookeeperserver(createZKBuilder(localhost, port2));
- return new ConfigserverConfig(builder);
+ private CuratorConfig createTestConfig() {
+ CuratorConfig.Builder builder = new CuratorConfig.Builder();
+ builder.server(createZKBuilder(localhost, port1));
+ builder.server(createZKBuilder(localhost, port2));
+ return new CuratorConfig(builder);
}
- private ConfigserverConfig.Zookeeperserver.Builder createZKBuilder(String hostname, int port) {
- ConfigserverConfig.Zookeeperserver.Builder zkBuilder = new ConfigserverConfig.Zookeeperserver.Builder();
+ private CuratorConfig.Server.Builder createZKBuilder(String hostname, int port) {
+ CuratorConfig.Server.Builder zkBuilder = new CuratorConfig.Server.Builder();
zkBuilder.hostname(hostname);
zkBuilder.port(port);
return zkBuilder;
}
- private Curator createCurator(ConfigserverConfig configserverConfig) {
- return new Curator(configserverConfig, Optional.empty());
+ private Curator createCurator(CuratorConfig curatorConfig) {
+ return new Curator(ConnectionSpec.create(curatorConfig.server(),
+ CuratorConfig.Server::hostname,
+ CuratorConfig.Server::port,
+ curatorConfig.zookeeperLocalhostAffinity()),
+ Optional.empty());
}
private static class PortAllocator {