From 02371ebcb2754ce99ee8182a870ac00ccff3f97e Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Fri, 20 Nov 2020 10:34:51 +0100 Subject: Extract ConnectionSpec --- .../com/yahoo/vespa/curator/ConnectionSpec.java | 102 ++++++++++++++++++++ .../main/java/com/yahoo/vespa/curator/Curator.java | 104 ++++++--------------- .../com/yahoo/vespa/curator/mock/MockCurator.java | 2 +- .../yahoo/vespa/curator/ConnectionSpecTest.java | 74 +++++++++++++++ .../java/com/yahoo/vespa/curator/CuratorTest.java | 16 ---- 5 files changed, 207 insertions(+), 91 deletions(-) create mode 100644 zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java create mode 100644 zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java new file mode 100644 index 00000000000..4409291419a --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java @@ -0,0 +1,102 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator; + +import com.yahoo.net.HostName; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** + * A connection spec for Curator. + * + * @author mpolden + */ +class ConnectionSpec { + + private final String local; + private final String ensemble; + private final int ensembleSize; + + private ConnectionSpec(String local, String ensemble, int ensembleSize) { + this.local = requireNonEmpty(local, "local spec"); + this.ensemble = requireNonEmpty(ensemble, "ensemble spec"); + this.ensembleSize = ensembleSize; + } + + /** Returns the local spec. This may be a subset of the ensemble spec */ + public String local() { + return local; + } + + /** Returns the ensemble spec. This always contains all nodes in the ensemble */ + public String ensemble() { + return ensemble; + } + + /** Returns the number of servers in the ensemble */ + public int ensembleSize() { + return ensembleSize; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConnectionSpec that = (ConnectionSpec) o; + return ensembleSize == that.ensembleSize && + local.equals(that.local) && + ensemble.equals(that.ensemble); + } + + @Override + public int hashCode() { + return Objects.hash(local, ensemble, ensembleSize); + } + + public static ConnectionSpec create(String spec) { + return create(spec, spec); + } + + public static ConnectionSpec create(String localSpec, String ensembleSpec) { + return new ConnectionSpec(localSpec, ensembleSpec, ensembleSpec.split(",").length); + } + + public static ConnectionSpec create(List servers, + Function hostnameGetter, + Function portGetter, + boolean localhostAffinity) { + String localSpec = createSpec(servers, hostnameGetter, portGetter, localhostAffinity); + String ensembleSpec = localhostAffinity ? createSpec(servers, hostnameGetter, portGetter, false) : localSpec; + return new ConnectionSpec(localSpec, ensembleSpec, servers.size()); + } + + private static String createSpec(List servers, + Function hostnameGetter, + Function portGetter, + boolean localhostAffinity) { + String thisServer = HostName.getLocalhost(); + StringBuilder connectionSpec = new StringBuilder(); + for (var server : servers) { + if (localhostAffinity && !thisServer.equals(hostnameGetter.apply(server))) continue; + connectionSpec.append(hostnameGetter.apply(server)); + connectionSpec.append(':'); + connectionSpec.append(portGetter.apply(server)); + connectionSpec.append(','); + } + if (localhostAffinity && connectionSpec.length() == 0) { + throw new IllegalArgumentException("Unable to create connect string to localhost: " + + "There is no localhost server specified in config"); + } + if (connectionSpec.length() > 0) { + connectionSpec.setLength(connectionSpec.length() - 1); // Remove trailing comma + } + return connectionSpec.toString(); + } + + private static String requireNonEmpty(String s, String field) { + if (Objects.requireNonNull(s).isEmpty()) throw new IllegalArgumentException(field + " must be non-empty"); + return s; + } + +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 6cbfa274c56..4aaae38f939 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -4,7 +4,6 @@ package com.yahoo.vespa.curator; import com.google.inject.Inject; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.io.IOUtils; -import com.yahoo.net.HostName; import com.yahoo.path.Path; import com.yahoo.text.Utf8; import com.yahoo.vespa.curator.recipes.CuratorCounter; @@ -55,81 +54,71 @@ public class Curator implements AutoCloseable { private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30); private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1); private static final int MAX_RETRIES = 10; + private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES); protected final RetryPolicy retryPolicy; private final CuratorFramework curatorFramework; - private final String connectionSpec; // May be a subset of the servers in the ensemble - private final String zooKeeperEnsembleConnectionSpec; - private final int zooKeeperEnsembleCount; + private final ConnectionSpec connectionSpec; // All lock keys, to allow re-entrancy. This will grow forever, but this should be too slow to be a problem private final ConcurrentHashMap locks = new ConcurrentHashMap<>(); /** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */ public static Curator create(String connectionSpec) { - return new Curator(connectionSpec, connectionSpec, Optional.of(ZK_CLIENT_CONFIG_FILE)); + return new Curator(ConnectionSpec.create(connectionSpec), Optional.of(ZK_CLIENT_CONFIG_FILE)); } // For testing only, use Optional.empty for clientConfigFile parameter to create default zookeeper client config public static Curator create(String connectionSpec, Optional clientConfigFile) { - return new Curator(connectionSpec, connectionSpec, clientConfigFile); + return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile); } // Depend on ZooKeeperServer to make sure it is started first - // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well) + // TODO: This can be removed when this package is no longer public API. @Inject - public Curator(ConfigserverConfig configserverConfig, VespaZooKeeperServer server) { + public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE)); } Curator(ConfigserverConfig configserverConfig, Optional clientConfigFile) { - this(createConnectionSpec(configserverConfig), createEnsembleConnectionSpec(configserverConfig), clientConfigFile); - } - - private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Optional clientConfigFile) { - this(connectionSpec, - zooKeeperEnsembleConnectionSpec, - (retryPolicy) -> CuratorFrameworkFactory - .builder() - .retryPolicy(retryPolicy) - .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) - .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) - .connectString(connectionSpec) - .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) - .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 - .build()); + this(ConnectionSpec.create(configserverConfig.zookeeperserver(), + ConfigserverConfig.Zookeeperserver::hostname, + ConfigserverConfig.Zookeeperserver::port, + configserverConfig.zookeeperLocalhostAffinity()), + clientConfigFile); } protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function curatorFactory) { - this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory, - new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES)); + this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory, DEFAULT_RETRY_POLICY); } - private Curator(String connectionSpec, - String zooKeeperEnsembleConnectionSpec, + private Curator(ConnectionSpec connectionSpec, Optional clientConfigFile) { + this(connectionSpec, + (retryPolicy) -> CuratorFrameworkFactory + .builder() + .retryPolicy(retryPolicy) + .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) + .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) + .connectString(connectionSpec.local()) + .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) + .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 + .build(), + DEFAULT_RETRY_POLICY); + } + + private Curator(ConnectionSpec connectionSpec, Function curatorFactory, RetryPolicy retryPolicy) { this.connectionSpec = connectionSpec; this.retryPolicy = retryPolicy; this.curatorFramework = curatorFactory.apply(retryPolicy); if (this.curatorFramework != null) { - validateConnectionSpec(connectionSpec); - validateConnectionSpec(zooKeeperEnsembleConnectionSpec); addLoggingListener(); curatorFramework.start(); } - - this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec; - this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length; - } - - private static String createConnectionSpec(ConfigserverConfig configserverConfig) { - return configserverConfig.zookeeperLocalhostAffinity() - ? createConnectionSpecForLocalhost(configserverConfig) - : createEnsembleConnectionSpec(configserverConfig); } private static ZKClientConfig createClientConfig(Optional clientConfigFile) { @@ -148,39 +137,6 @@ public class Curator implements AutoCloseable { } } - private static String createEnsembleConnectionSpec(ConfigserverConfig config) { - StringBuilder connectionSpec = new StringBuilder(); - for (int i = 0; i < config.zookeeperserver().size(); i++) { - if (connectionSpec.length() > 0) { - connectionSpec.append(','); - } - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - connectionSpec.append(server.hostname()); - connectionSpec.append(':'); - connectionSpec.append(server.port()); - } - return connectionSpec.toString(); - } - - static String createConnectionSpecForLocalhost(ConfigserverConfig config) { - String thisServer = HostName.getLocalhost(); - - for (int i = 0; i < config.zookeeperserver().size(); i++) { - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - if (thisServer.equals(server.hostname())) { - return String.format("%s:%d", server.hostname(), server.port()); - } - } - - throw new IllegalArgumentException("Unable to create connect string to localhost: " + - "There is no localhost server specified in config: " + config); - } - - private static void validateConnectionSpec(String connectionSpec) { - if (connectionSpec == null || connectionSpec.isEmpty()) - throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec)); - } - /** * Returns the ZooKeeper "connect string" used by curator: a comma-separated list of * host:port of ZooKeeper endpoints to connect to. This may be a subset of @@ -189,7 +145,7 @@ public class Curator implements AutoCloseable { * * This may be empty but never null */ - public String connectionSpec() { return connectionSpec; } + public String connectionSpec() { return connectionSpec.local(); } /** For internal use; prefer creating a {@link CuratorCounter} */ public DistributedAtomicLong createAtomicCounter(String path) { @@ -432,7 +388,7 @@ public class Curator implements AutoCloseable { * TODO: Move method out of this class. */ public String zooKeeperEnsembleConnectionSpec() { - return zooKeeperEnsembleConnectionSpec; + return connectionSpec.ensemble(); } /** @@ -440,7 +396,7 @@ public class Curator implements AutoCloseable { * WARNING: This may be different from the number of servers this Curator may connect to. * TODO: Move method out of this class. */ - public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; } + public int zooKeeperEnsembleCount() { return connectionSpec.ensembleSize(); } private static Optional getEnvironmentVariable(String variableName) { return Optional.ofNullable(System.getenv().get(variableName)) diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java index 3da7678c44e..8e3b433354d 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java @@ -137,7 +137,7 @@ public class MockCurator extends Curator { * This is not what ZooKeeper does. */ public MockCurator(boolean stableOrdering) { - super("", "", (retryPolicy) -> null); + super("host1:10001", "host1:10001", (retryPolicy) -> null); this.stableOrdering = stableOrdering; curatorFramework = new MockCuratorFramework(); curatorFramework.start(); diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java new file mode 100644 index 00000000000..a518d8df843 --- /dev/null +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java @@ -0,0 +1,74 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator; + +import com.yahoo.net.HostName; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * @author mpolden + */ +public class ConnectionSpecTest { + + @Test + public void create() { + HostName.setHostNameForTestingOnly("host2"); + Config config = new Config(List.of(new Config.Server("host1", 10001), + new Config.Server("host2", 10002), + new Config.Server("host3", 10003))); + + { + ConnectionSpec spec = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, false); + assertEquals("host1:10001,host2:10002,host3:10003", spec.local()); + assertEquals("host1:10001,host2:10002,host3:10003", spec.ensemble()); + assertEquals(3, spec.ensembleSize()); + } + + { + ConnectionSpec specLocalAffinity = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, true); + assertEquals("host2:10002", specLocalAffinity.local()); + assertEquals("host1:10001,host2:10002,host3:10003", specLocalAffinity.ensemble()); + assertEquals(3, specLocalAffinity.ensembleSize()); + } + + { + ConnectionSpec specFromString = ConnectionSpec.create("host1:10001", "host1:10001,host2:10002"); + assertEquals("host1:10001", specFromString.local()); + assertEquals("host1:10001,host2:10002", specFromString.ensemble()); + assertEquals(2, specFromString.ensembleSize()); + } + } + + private static class Config { + + private final List servers; + + public Config(List servers) { + this.servers = servers; + } + + private static class Server { + + private final String hostname; + private final int port; + + public Server(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public String hostname() { + return hostname; + } + + public int port() { + return port; + } + } + + } + +} diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java index 2bf40c4e2bb..1c7cb3695a8 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java @@ -68,22 +68,6 @@ public class CuratorTest { } } - @Test - public void localhost_affinity() { - String localhostHostName = "myhost"; - int localhostPort = 123; - - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhostHostName, localhostPort)); - builder.zookeeperserver(createZKBuilder("otherhost", 345)); - ConfigserverConfig config = new ConfigserverConfig(builder); - - HostName.setHostNameForTestingOnly(localhostHostName); - - String localhostSpec = localhostHostName + ":" + localhostPort; - assertEquals(localhostSpec, Curator.createConnectionSpecForLocalhost(config)); - } - private ConfigserverConfig createTestConfig() { ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); builder.zookeeperserver(createZKBuilder(localhost, port1)); -- cgit v1.2.3 From a7f509b84b99c5b18c147ad81f0a5b06f09507e4 Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Fri, 20 Nov 2020 10:44:47 +0100 Subject: Use CuratorConfig --- zkfacade/abi-spec.json | 1 + .../main/java/com/yahoo/vespa/curator/Curator.java | 19 +++++++++----- .../java/com/yahoo/vespa/curator/CuratorTest.java | 30 ++++++++++++---------- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json index e026559b283..4aa8775940e 100644 --- a/zkfacade/abi-spec.json +++ b/zkfacade/abi-spec.json @@ -68,6 +68,7 @@ "methods": [ "public static com.yahoo.vespa.curator.Curator create(java.lang.String)", "public static com.yahoo.vespa.curator.Curator create(java.lang.String, java.util.Optional)", + "public void (com.yahoo.cloud.config.CuratorConfig, com.yahoo.vespa.zookeeper.VespaZooKeeperServer)", "public void (com.yahoo.cloud.config.ConfigserverConfig, com.yahoo.vespa.zookeeper.VespaZooKeeperServer)", "protected void (java.lang.String, java.lang.String, java.util.function.Function)", "public java.lang.String connectionSpec()", diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 4aaae38f939..e281cb01f74 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.curator; import com.google.inject.Inject; import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.cloud.config.CuratorConfig; import com.yahoo.io.IOUtils; import com.yahoo.path.Path; import com.yahoo.text.Utf8; @@ -74,19 +75,23 @@ public class Curator implements AutoCloseable { return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile); } - // Depend on ZooKeeperServer to make sure it is started first - // TODO: This can be removed when this package is no longer public API. @Inject - public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { - this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE)); + public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { + // Depends on ZooKeeperServer to make sure it is started first + this(ConnectionSpec.create(curatorConfig.server(), + CuratorConfig.Server::hostname, + CuratorConfig.Server::port, + curatorConfig.zookeeperLocalhostAffinity()), + Optional.of(ZK_CLIENT_CONFIG_FILE)); } - Curator(ConfigserverConfig configserverConfig, Optional clientConfigFile) { + // TODO: This can be removed when this package is no longer public API. + public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { this(ConnectionSpec.create(configserverConfig.zookeeperserver(), ConfigserverConfig.Zookeeperserver::hostname, ConfigserverConfig.Zookeeperserver::port, configserverConfig.zookeeperLocalhostAffinity()), - clientConfigFile); + Optional.of(ZK_CLIENT_CONFIG_FILE)); } protected Curator(String connectionSpec, @@ -95,7 +100,7 @@ public class Curator implements AutoCloseable { this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory, DEFAULT_RETRY_POLICY); } - private Curator(ConnectionSpec connectionSpec, Optional clientConfigFile) { + Curator(ConnectionSpec connectionSpec, Optional clientConfigFile) { this(connectionSpec, (retryPolicy) -> CuratorFrameworkFactory .builder() diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java index 1c7cb3695a8..5341efaefe5 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.curator; -import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.cloud.config.CuratorConfig; import com.yahoo.net.HostName; import org.apache.curator.test.TestingServer; import org.junit.After; @@ -61,29 +61,33 @@ public class CuratorTest { @Test public void require_that_server_count_is_correct() { - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhost, port1)); - try (Curator curator = createCurator(new ConfigserverConfig(builder))) { + CuratorConfig.Builder builder = new CuratorConfig.Builder(); + builder.server(createZKBuilder(localhost, port1)); + try (Curator curator = createCurator(new CuratorConfig(builder))) { assertEquals(1, curator.zooKeeperEnsembleCount()); } } - private ConfigserverConfig createTestConfig() { - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhost, port1)); - builder.zookeeperserver(createZKBuilder(localhost, port2)); - return new ConfigserverConfig(builder); + private CuratorConfig createTestConfig() { + CuratorConfig.Builder builder = new CuratorConfig.Builder(); + builder.server(createZKBuilder(localhost, port1)); + builder.server(createZKBuilder(localhost, port2)); + return new CuratorConfig(builder); } - private ConfigserverConfig.Zookeeperserver.Builder createZKBuilder(String hostname, int port) { - ConfigserverConfig.Zookeeperserver.Builder zkBuilder = new ConfigserverConfig.Zookeeperserver.Builder(); + private CuratorConfig.Server.Builder createZKBuilder(String hostname, int port) { + CuratorConfig.Server.Builder zkBuilder = new CuratorConfig.Server.Builder(); zkBuilder.hostname(hostname); zkBuilder.port(port); return zkBuilder; } - private Curator createCurator(ConfigserverConfig configserverConfig) { - return new Curator(configserverConfig, Optional.empty()); + private Curator createCurator(CuratorConfig curatorConfig) { + return new Curator(ConnectionSpec.create(curatorConfig.server(), + CuratorConfig.Server::hostname, + CuratorConfig.Server::port, + curatorConfig.zookeeperLocalhostAffinity()), + Optional.empty()); } private static class PortAllocator { -- cgit v1.2.3 From 607dbfedf2fbf8a2bddb5ffb1e23c796eb92dd5f Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Fri, 20 Nov 2020 11:03:07 +0100 Subject: Refactor mock to simplify Curator constructors --- .../main/java/com/yahoo/vespa/curator/Curator.java | 31 +- .../com/yahoo/vespa/curator/mock/MockCurator.java | 1158 +------------------ .../vespa/curator/mock/MockCuratorFramework.java | 1169 ++++++++++++++++++++ 3 files changed, 1195 insertions(+), 1163 deletions(-) create mode 100644 zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index e281cb01f74..127eeba71e1 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -31,6 +31,7 @@ import java.io.File; import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -57,7 +58,7 @@ public class Curator implements AutoCloseable { private static final int MAX_RETRIES = 10; private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES); - protected final RetryPolicy retryPolicy; + protected final RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY; private final CuratorFramework curatorFramework; private final ConnectionSpec connectionSpec; @@ -94,36 +95,28 @@ public class Curator implements AutoCloseable { Optional.of(ZK_CLIENT_CONFIG_FILE)); } - protected Curator(String connectionSpec, - String zooKeeperEnsembleConnectionSpec, - Function curatorFactory) { - this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory, DEFAULT_RETRY_POLICY); + protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function curatorFactory) { + this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory.apply(DEFAULT_RETRY_POLICY)); } Curator(ConnectionSpec connectionSpec, Optional clientConfigFile) { this(connectionSpec, - (retryPolicy) -> CuratorFrameworkFactory + CuratorFrameworkFactory .builder() - .retryPolicy(retryPolicy) + .retryPolicy(DEFAULT_RETRY_POLICY) .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) .connectString(connectionSpec.local()) .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 - .build(), - DEFAULT_RETRY_POLICY); + .build()); } - private Curator(ConnectionSpec connectionSpec, - Function curatorFactory, - RetryPolicy retryPolicy) { - this.connectionSpec = connectionSpec; - this.retryPolicy = retryPolicy; - this.curatorFramework = curatorFactory.apply(retryPolicy); - if (this.curatorFramework != null) { - addLoggingListener(); - curatorFramework.start(); - } + private Curator(ConnectionSpec connectionSpec, CuratorFramework curatorFramework) { + this.connectionSpec = Objects.requireNonNull(connectionSpec); + this.curatorFramework = Objects.requireNonNull(curatorFramework); + addLoggingListener(); + curatorFramework.start(); } private static ZKClientConfig createClientConfig(Optional clientConfigFile) { diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java index 8e3b433354d..26f1c336874 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java @@ -1,94 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.curator.mock; -import com.google.common.util.concurrent.UncheckedTimeoutException; import com.google.inject.Inject; -import com.yahoo.collections.Pair; -import com.yahoo.concurrent.Lock; -import com.yahoo.concurrent.Locks; import com.yahoo.path.Path; -import com.yahoo.vespa.curator.CompletionTimeoutException; import com.yahoo.vespa.curator.Curator; -import com.yahoo.vespa.curator.recipes.CuratorLockException; -import org.apache.curator.CuratorZookeeperClient; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; -import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable; -import org.apache.curator.framework.api.ACLCreateModePathAndBytesable; -import org.apache.curator.framework.api.ACLPathAndBytesable; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.BackgroundPathAndBytesable; -import org.apache.curator.framework.api.BackgroundPathable; -import org.apache.curator.framework.api.BackgroundVersionable; -import org.apache.curator.framework.api.ChildrenDeletable; -import org.apache.curator.framework.api.CreateBackgroundModeACLable; -import org.apache.curator.framework.api.CreateBuilder; -import org.apache.curator.framework.api.CuratorListener; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.api.DeleteBuilder; -import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; -import org.apache.curator.framework.api.ErrorListenerPathable; -import org.apache.curator.framework.api.ExistsBuilder; -import org.apache.curator.framework.api.ExistsBuilderMain; -import org.apache.curator.framework.api.GetACLBuilder; -import org.apache.curator.framework.api.GetChildrenBuilder; -import org.apache.curator.framework.api.GetDataBuilder; -import org.apache.curator.framework.api.GetDataWatchBackgroundStatable; -import org.apache.curator.framework.api.PathAndBytesable; -import org.apache.curator.framework.api.Pathable; -import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable; -import org.apache.curator.framework.api.SetACLBuilder; -import org.apache.curator.framework.api.SetDataBackgroundVersionable; -import org.apache.curator.framework.api.SetDataBuilder; -import org.apache.curator.framework.api.SyncBuilder; -import org.apache.curator.framework.api.UnhandledErrorListener; -import org.apache.curator.framework.api.VersionPathAndBytesable; -import org.apache.curator.framework.api.WatchPathable; -import org.apache.curator.framework.api.Watchable; -import org.apache.curator.framework.api.transaction.CuratorTransaction; -import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; -import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; -import org.apache.curator.framework.api.transaction.CuratorTransactionResult; -import org.apache.curator.framework.api.transaction.TransactionCheckBuilder; -import org.apache.curator.framework.api.transaction.TransactionCreateBuilder; -import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder; -import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.recipes.atomic.AtomicStats; -import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessLock; -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.curator.utils.EnsurePath; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import java.nio.file.Paths; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node; /** *

A non thread safe mock of the curator API. @@ -105,24 +25,7 @@ import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node; */ public class MockCurator extends Curator { - public boolean timeoutOnLock = false; - public boolean throwExceptionOnLock = false; - private boolean shouldTimeoutOnEnter = false; - private int monotonicallyIncreasingNumber = 0; - private final boolean stableOrdering; private String zooKeeperEnsembleConnectionSpec = ""; - private final Locks locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS); - - /** The file system used by this mock to store zookeeper files and directories */ - private final MemoryFileSystem fileSystem = new MemoryFileSystem(); - - /** Atomic counters. A more accurate mock would store these as files in the file system */ - private final Map atomicCounters = new ConcurrentHashMap<>(); - - /** Listeners to changes to a particular path */ - private final ListenerMap listeners = new ListenerMap(); - - private final CuratorFramework curatorFramework; /** Creates a mock curator with stable ordering */ @Inject @@ -137,26 +40,24 @@ public class MockCurator extends Curator { * This is not what ZooKeeper does. */ public MockCurator(boolean stableOrdering) { - super("host1:10001", "host1:10001", (retryPolicy) -> null); - this.stableOrdering = stableOrdering; - curatorFramework = new MockCuratorFramework(); - curatorFramework.start(); + super("host1:2181", "host1:2181", (retryPolicy) -> new MockCuratorFramework(stableOrdering, false)); + } + + private MockCuratorFramework mockFramework() { + return (MockCuratorFramework) super.framework(); } /** * Lists the entire content of this curator instance as a multiline string. * Useful for debugging. */ - public String dumpState() { return fileSystem.dumpState(); } - - /** Returns a started curator framework */ - public CuratorFramework framework() { return curatorFramework; } + public String dumpState() { return mockFramework().fileSystem().dumpState(); } /** Returns an atomic counter in this, or empty if no such counter is created */ public Optional counter(String path) { - return Optional.ofNullable(atomicCounters.get(path)); + return Optional.ofNullable(mockFramework().atomicCounters().get(path)); } - + /** * Sets the ZooKeeper ensemble connection spec, which must be on the form * host1:port,host2:port ... @@ -170,1068 +71,37 @@ public class MockCurator extends Curator { return zooKeeperEnsembleConnectionSpec; } - // ----- Start of adaptor methods from Curator to the mock file system ----- - - /** Creates a node below the given directory root */ - private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, Node root, Listeners listeners) - throws KeeperException.NodeExistsException, KeeperException.NoNodeException { - validatePath(pathString); - Path path = Path.fromString(pathString); - if (path.isRoot()) return "/"; // the root already exists - Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents); - String name = nodeName(path.getName(), createMode); - - if (parent == null) - throw new KeeperException.NoNodeException(path.getParentPath().toString()); - if (parent.children().containsKey(path.getName())) - throw new KeeperException.NodeExistsException(path.toString()); - - parent.add(name).setContent(content); - String nodePath = "/" + path.getParentPath().toString() + "/" + name; - listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED); - return nodePath; - } - - /** Deletes a node below the given directory root */ - private void deleteNode(String pathString, boolean deleteChildren, Node root, Listeners listeners) - throws KeeperException.NoNodeException, KeeperException.NotEmptyException { - validatePath(pathString); - Path path = Path.fromString(pathString); - Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); - if (parent == null) throw new KeeperException.NoNodeException(path.toString()); - Node node = parent.children().get(path.getName()); - if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent); - if ( ! node.children().isEmpty() && ! deleteChildren) - throw new KeeperException.NotEmptyException(path.toString()); - parent.remove(path.getName()); - listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED); - } - - /** Returns the data of a node */ - private byte[] getData(String pathString, Node root) throws KeeperException.NoNodeException { - validatePath(pathString); - return getNode(pathString, root).getContent(); - } - - /** sets the data of an existing node */ - private void setData(String pathString, byte[] content, Node root, Listeners listeners) - throws KeeperException.NoNodeException { - validatePath(pathString); - getNode(pathString, root).setContent(content); - listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED); - } - - private List getChildren(String path, Node root) throws KeeperException.NoNodeException { - validatePath(path); - Node node = root.getNode(Paths.get(path), false); - if (node == null) throw new KeeperException.NoNodeException(path); - List children = new ArrayList<>(node.children().keySet()); - if (! stableOrdering) - Collections.shuffle(children); - return children; - } - - private boolean exists(String path, Node root) { - validatePath(path); - Node parent = root.getNode(Paths.get(Path.fromString(path).getParentPath().toString()), false); - if (parent == null) return false; - Node node = parent.children().get(Path.fromString(path).getName()); - return node != null; - } - - /** Returns a node or throws the appropriate exception if it doesn't exist */ - private Node getNode(String pathString, Node root) throws KeeperException.NoNodeException { - validatePath(pathString); - Path path = Path.fromString(pathString); - Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); - if (parent == null) throw new KeeperException.NoNodeException(path.toString()); - Node node = parent.children().get(path.getName()); - if (node == null) throw new KeeperException.NoNodeException(path.toString()); - return node; - } - - private String nodeName(String baseName, CreateMode createMode) { - switch (createMode) { - case PERSISTENT: case EPHEMERAL: return baseName; - case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++; - default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator"); - } - } - - /** Validates a path using the same rules as ZooKeeper */ - public static String validatePath(String path) throws IllegalArgumentException { - if (path == null) throw new IllegalArgumentException("Path cannot be null"); - if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0"); - if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character"); - if (path.length() == 1) return path; // done checking - it's the root - if (path.charAt(path.length() - 1) == '/') - throw new IllegalArgumentException("Path must not end with / character"); - - String reason = null; - char lastc = '/'; - char chars[] = path.toCharArray(); - char c; - for (int i = 1; i < chars.length; lastc = chars[i], i++) { - c = chars[i]; - - if (c == 0) { - reason = "null character not allowed @" + i; - break; - } else if (c == '/' && lastc == '/') { - reason = "empty node name specified @" + i; - break; - } else if (c == '.' && lastc == '.') { - if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { - reason = "relative paths not allowed @" + i; - break; - } - } else if (c == '.') { - if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { - reason = "relative paths not allowed @" + i; - break; - } - } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F' - || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') { - reason = "invalid charater @" + i; - break; - } - } - - if (reason != null) - throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason); - return path; - } - - // ----- Mock of Curator recipes accessed through our Curator interface ----- - @Override public DistributedAtomicLong createAtomicCounter(String path) { - MockAtomicCounter counter = atomicCounters.get(path); - if (counter == null) { - counter = new MockAtomicCounter(path); - atomicCounters.put(path, counter); - } - return counter; + return mockFramework().createAtomicCounter(path); } - /** Create a mutex which ensures exclusive access within this single vm */ @Override public InterProcessLock createMutex(String path) { - return new MockLock(path); - } - - public MockCurator timeoutBarrierOnEnter(boolean shouldTimeout) { - shouldTimeoutOnEnter = shouldTimeout; - return this; + return mockFramework().createMutex(path); } @Override public CompletionWaiter getCompletionWaiter(Path parentPath, int numMembers, String id) { - return new MockCompletionWaiter(); + return mockFramework().createCompletionWaiter(); } @Override public CompletionWaiter createCompletionWaiter(Path parentPath, String waiterNode, int numMembers, String id) { - return new MockCompletionWaiter(); + return mockFramework().createCompletionWaiter(); } @Override public DirectoryCache createDirectoryCache(String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) { - return new MockDirectoryCache(Path.fromString(path)); + return mockFramework().createDirectoryCache(path); } @Override public FileCache createFileCache(String path, boolean dataIsCompressed) { - return new MockFileCache(Path.fromString(path)); + return mockFramework().createFileCache(path); } @Override public int zooKeeperEnsembleCount() { return 1; } - /** - * Invocation of changes to the file system state is abstracted through this to allow transactional - * changes to notify on commit - */ - private abstract class Listeners { - - /** Translating method */ - public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) { - String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/ - PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data)); - notify(path, event); - } - - public abstract void notify(Path path, PathChildrenCacheEvent event); - - } - - /** The regular listener implementation which notifies registered file and directory listeners */ - private class ListenerMap extends Listeners { - - private final Map directoryListeners = new ConcurrentHashMap<>(); - private final Map fileListeners = new ConcurrentHashMap<>(); - - public void add(Path path, PathChildrenCacheListener listener) { - directoryListeners.put(path, listener); - } - - public void add(Path path, NodeCacheListener listener) { - fileListeners.put(path, listener); - } - - @Override - public void notify(Path path, PathChildrenCacheEvent event) { - try { - // Snapshot directoryListeners in case notification leads to new directoryListeners added - Set> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet()); - for (Map.Entry listener : directoryListenerSnapshot) { - if (path.isChildOf(listener.getKey())) - listener.getValue().childEvent(curatorFramework, event); - } - - // Snapshot directoryListeners in case notification leads to new directoryListeners added - Set> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet()); - for (Map.Entry listener : fileListenerSnapshot) { - if (path.equals(listener.getKey())) - listener.getValue().nodeChanged(); - } - } - catch (Exception e) { - e.printStackTrace(); // TODO: Remove - throw new RuntimeException("Exception notifying listeners", e); - } - } - - } - - private class MockCompletionWaiter implements CompletionWaiter { - - @Override - public void awaitCompletion(Duration timeout) { - if (shouldTimeoutOnEnter) { - throw new CompletionTimeoutException(""); - } - } - - @Override - public void notifyCompletion() { - } - - } - - /** A lock which works inside a single vm */ - private class MockLock extends InterProcessSemaphoreMutex { - - private final String path; - - private Lock lock = null; - - public MockLock(String path) { - super(curatorFramework, path); - this.path = path; - } - - @Override - public boolean acquire(long timeout, TimeUnit unit) { - if (throwExceptionOnLock) - throw new CuratorLockException("Thrown by mock"); - if (timeoutOnLock) return false; - - try { - lock = locks.lock(path, timeout, unit); - return true; - } - catch (UncheckedTimeoutException e) { - return false; - } - } - - @Override - public void acquire() { - if (throwExceptionOnLock) - throw new CuratorLockException("Thrown by mock"); - - lock = locks.lock(path); - } - - @Override - public void release() { - if (lock != null) - lock.close(); - } - - } - - private class MockAtomicCounter extends DistributedAtomicLong { - - private boolean initialized = false; - private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/ - - public MockAtomicCounter(String path) { - super(curatorFramework, path, retryPolicy); - } - - @Override - public boolean initialize(Long value) { - if (initialized) return false; - this.value = new MockLongValue(value); - initialized = true; - return true; - } - - @Override - public AtomicValue get() { - if (value == null) return new MockLongValue(0); - return value; - } - - public AtomicValue add(Long delta) throws Exception { - return trySet(value.postValue() + delta); - } - - public AtomicValue subtract(Long delta) throws Exception { - return trySet(value.postValue() - delta); - } - - @Override - public AtomicValue increment() { - return trySet(value.postValue() + 1); - } - - public AtomicValue decrement() throws Exception { - return trySet(value.postValue() - 1); - } - - @Override - public AtomicValue trySet(Long longval) { - value = new MockLongValue(longval); - return value; - } - - public void forceSet(Long newValue) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public AtomicValue compareAndSet(Long expectedValue, Long newValue) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - } - - private class MockLongValue implements AtomicValue { - - private AtomicLong value = new AtomicLong(); - - public MockLongValue(long value) { - this.value.set(value); - } - - @Override - public boolean succeeded() { - return true; - } - - public void setValue(long value) { - this.value.set(value); - } - - @Override - public Long preValue() { - return value.get(); - } - - @Override - public Long postValue() { - return value.get(); - } - - @Override - public AtomicStats getStats() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - } - - private class MockDirectoryCache implements DirectoryCache { - - /** The path this is caching and listening to */ - private Path path; - - public MockDirectoryCache(Path path) { - this.path = path; - } - - @Override - public void start() {} - - @Override - public void addListener(PathChildrenCacheListener listener) { - listeners.add(path, listener); - } - - @Override - public List getCurrentData() { - List childData = new ArrayList<>(); - for (String childName : getChildren(path)) { - Path childPath = path.append(childName); - childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get())); - } - return childData; - } - - @Override - public ChildData getCurrentData(Path fullPath) { - if (!fullPath.getParentPath().equals(path)) { - throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'"); - } - - return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null); - } - - private void collectData(Node parent, Path parentPath, List data) { - for (Node child : parent.children().values()) { - Path childPath = parentPath.append(child.name()); - data.add(new ChildData("/" + childPath.toString(), null, child.getContent())); - } - } - - @Override - public void close() {} - - } - - private class MockFileCache implements FileCache { - - /** The path this is caching and listening to */ - private Path path; - - public MockFileCache(Path path) { - this.path = path; - } - - @Override - public void start() {} - - @Override - public void addListener(NodeCacheListener listener) { - listeners.add(path, listener); - } - - @Override - public ChildData getCurrentData() { - Node node = fileSystem.root().getNode(Paths.get(path.toString()), false); - if (node == null) return null; - return new ChildData("/" + path.toString(), null, node.getContent()); - } - - @Override - public void close() {} - - } - - // ----- The rest of this file is adapting the Curator (non-recipe) API to the ----- - // ----- file system methods above. ----- - // ----- There's nothing to see unless you are interested in an illustration of ----- - // ----- the folly of fluent API's or, more generally, mankind. ----- - - private abstract class MockBackgroundACLPathAndBytesableBuilder implements PathAndBytesable, ProtectACLCreateModePathAndBytesable { - - public BackgroundPathAndBytesable withACL(List list) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public ACLBackgroundPathAndBytesable withMode(CreateMode createMode) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ACLCreateModeBackgroundPathAndBytesable withProtection() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public T forPath(String s, byte[] bytes) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public T forPath(String s) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - } - - private class MockCreateBuilder extends MockBackgroundACLPathAndBytesableBuilder implements CreateBuilder { - - private boolean createParents = false; - private CreateMode createMode = CreateMode.PERSISTENT; - - @Override - public ProtectACLCreateModePathAndBytesable creatingParentsIfNeeded() { - createParents = true; - return this; - } - - @Override - public ACLCreateModeBackgroundPathAndBytesable withProtection() { - // Protection against the server crashing after creating the file but before returning to the client. - // Not relevant for an in-memory mock, obviously - return this; - } - - public ACLBackgroundPathAndBytesable withMode(CreateMode createMode) { - this.createMode = createMode; - return this; - } - - @Override - public CreateBackgroundModeACLable compressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ProtectACLCreateModePathAndBytesable creatingParentContainersIfNeeded() { - // TODO: Add proper support for container nodes, see https://issues.apache.org/jira/browse/ZOOKEEPER-2163. - return creatingParentsIfNeeded(); - } - - @Override - @Deprecated - public ACLPathAndBytesable withProtectedEphemeralSequential() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public String forPath(String s) throws Exception { - return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); - } - - public String forPath(String s, byte[] bytes) throws Exception { - return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); - } - - @Override - public ErrorListenerPathAndBytesable inBackground() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - private class MockBackgroundPathableBuilder implements BackgroundPathable, Watchable> { - - @Override - public ErrorListenerPathable inBackground() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable inBackground(Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback, Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public T forPath(String s) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathable watched() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathable usingWatcher(Watcher watcher) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathable usingWatcher(CuratorWatcher curatorWatcher) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder> implements GetChildrenBuilder { - - @Override - public WatchPathable> storingStatIn(Stat stat) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public List forPath(String path) throws Exception { - return getChildren(path, fileSystem.root()); - } - - } - - private class MockExistsBuilder extends MockBackgroundPathableBuilder implements ExistsBuilder { - - @Override - public Stat forPath(String path) throws Exception { - try { - Node node = getNode(path, fileSystem.root()); - Stat stat = new Stat(); - stat.setVersion(node.version()); - return stat; - } - catch (KeeperException.NoNodeException e) { - return null; - } - } - - @Override - public ExistsBuilderMain creatingParentContainersIfNeeded() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - private class MockDeleteBuilder extends MockBackgroundPathableBuilder implements DeleteBuilder { - - private boolean deleteChildren = false; - - @Override - public BackgroundVersionable deletingChildrenIfNeeded() { - deleteChildren = true; - return this; - } - - @Override - public ChildrenDeletable guaranteed() { - return this; - } - - @Override - public BackgroundPathable withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public Void forPath(String pathString) throws Exception { - deleteNode(pathString, deleteChildren, fileSystem.root(), listeners); - return null; - } - - } - - private class MockGetDataBuilder extends MockBackgroundPathableBuilder implements GetDataBuilder { - - @Override - public GetDataWatchBackgroundStatable decompressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public WatchPathable storingStatIn(Stat stat) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - public byte[] forPath(String path) throws Exception { - return getData(path, fileSystem.root()); - } - - } - - private class MockSetDataBuilder extends MockBackgroundACLPathAndBytesableBuilder implements SetDataBuilder { - - @Override - public SetDataBackgroundVersionable compressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public BackgroundPathAndBytesable withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public Stat forPath(String path, byte[] bytes) throws Exception { - setData(path, bytes, fileSystem.root(), listeners); - return null; - } - - @Override - public ErrorListenerPathAndBytesable inBackground() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - } - - /** Allows addition of directoryListeners which are never called */ - private class MockListenable implements Listenable { - - @Override - public void addListener(T t) { - } - - @Override - public void addListener(T t, Executor executor) { - } - - @Override - public void removeListener(T t) { - } - - } - - private class MockCuratorTransactionFinal implements CuratorTransactionFinal { - - /** The new directory root in which the transactional changes are made */ - private Node newRoot; - - private boolean committed = false; - - private final DelayedListener delayedListener = new DelayedListener(); - - public MockCuratorTransactionFinal() { - newRoot = fileSystem.root().clone(); - } - - @Override - public Collection commit() throws Exception { - fileSystem.replaceRoot(newRoot); - committed = true; - delayedListener.commit(); - return null; // TODO - } - - @Override - public TransactionCreateBuilder create() { - ensureNotCommitted(); - return new MockTransactionCreateBuilder(); - } - - @Override - public TransactionDeleteBuilder delete() { - ensureNotCommitted(); - return new MockTransactionDeleteBuilder(); - } - - @Override - public TransactionSetDataBuilder setData() { - ensureNotCommitted(); - return new MockTransactionSetDataBuilder(); - } - - @Override - public TransactionCheckBuilder check() { - ensureNotCommitted(); - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - private void ensureNotCommitted() { - if (committed) throw new IllegalStateException("transaction already committed"); - } - - private class MockTransactionCreateBuilder implements TransactionCreateBuilder { - - private CreateMode createMode = CreateMode.PERSISTENT; - - @Override - public PathAndBytesable withACL(List list) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ACLCreateModePathAndBytesable compressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public ACLPathAndBytesable withMode(CreateMode createMode) { - this.createMode = createMode; - return this; - } - - @Override - public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { - createNode(s, bytes, false, createMode, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - @Override - public CuratorTransactionBridge forPath(String s) throws Exception { - createNode(s, new byte[0], false, createMode, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - } - - private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder { - - @Override - public Pathable withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorTransactionBridge forPath(String path) throws Exception { - deleteNode(path, false, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - } - - private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder { - - @Override - public VersionPathAndBytesable compressed() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public PathAndBytesable withVersion(int i) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { - MockCurator.this.setData(s, bytes, newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - @Override - public CuratorTransactionBridge forPath(String s) throws Exception { - MockCurator.this.setData(s, new byte[0], newRoot, delayedListener); - return new MockCuratorTransactionBridge(); - } - - } - - private class MockCuratorTransactionBridge implements CuratorTransactionBridge { - - @Override - public CuratorTransactionFinal and() { - return MockCuratorTransactionFinal.this; - } - - } - - /** A class which collects listen events and forwards them to the regular directoryListeners on commit */ - private class DelayedListener extends Listeners { - - private final List> events = new ArrayList<>(); - - @Override - public void notify(Path path, PathChildrenCacheEvent event) { - events.add(new Pair<>(path, event)); - } - - public void commit() { - for (Pair event : events) - listeners.notify(event.getFirst(), event.getSecond()); - } - - } - - } - - private class MockCuratorFramework implements CuratorFramework { - - private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT; - - @Override - public void start() { - curatorState = CuratorFrameworkState.STARTED; - } - - @Override - public void close() { - curatorState = CuratorFrameworkState.STOPPED; - } - - @Override - public CuratorFrameworkState getState() { - return curatorState; - } - - @Override - @Deprecated - public boolean isStarted() { - return curatorState == CuratorFrameworkState.STARTED; - } - - @Override - public CreateBuilder create() { - return new MockCreateBuilder(); - } - - @Override - public DeleteBuilder delete() { - return new MockDeleteBuilder(); - } - - @Override - public ExistsBuilder checkExists() { - return new MockExistsBuilder(); - } - - @Override - public GetDataBuilder getData() { - return new MockGetDataBuilder(); - } - - @Override - public SetDataBuilder setData() { - return new MockSetDataBuilder(); - } - - @Override - public GetChildrenBuilder getChildren() { - return new MockGetChildrenBuilder(); - } - - @Override - public GetACLBuilder getACL() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public SetACLBuilder setACL() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorTransaction inTransaction() { - return new MockCuratorTransactionFinal(); - } - - @Override - @Deprecated - public void sync(String path, Object backgroundContextObject) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public void createContainers(String s) throws Exception { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public Listenable getConnectionStateListenable() { - return new MockListenable<>(); - } - - @Override - public Listenable getCuratorListenable() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public Listenable getUnhandledErrorListenable() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - @Deprecated - public CuratorFramework nonNamespaceView() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorFramework usingNamespace(String newNamespace) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public String getNamespace() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public CuratorZookeeperClient getZookeeperClient() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Deprecated - @Override - public EnsurePath newNamespaceAwareEnsurePath(String path) { - return new EnsurePath(path); - } - - @Override - public void clearWatcherReferences(Watcher watcher) { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - @Override - public boolean blockUntilConnected(int i, TimeUnit timeUnit) throws InterruptedException { - return true; - } - - @Override - public void blockUntilConnected() throws InterruptedException { - - } - - @Override - public SyncBuilder sync() { - throw new UnsupportedOperationException("Not implemented in MockCurator"); - } - - } - } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java new file mode 100644 index 00000000000..9a845e56bfd --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCuratorFramework.java @@ -0,0 +1,1169 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator.mock; + +import com.google.common.util.concurrent.UncheckedTimeoutException; +import com.yahoo.collections.Pair; +import com.yahoo.concurrent.Lock; +import com.yahoo.concurrent.Locks; +import com.yahoo.path.Path; +import com.yahoo.vespa.curator.CompletionTimeoutException; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.recipes.CuratorLockException; +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; +import org.apache.curator.framework.api.ACLCreateModeBackgroundPathAndBytesable; +import org.apache.curator.framework.api.ACLCreateModePathAndBytesable; +import org.apache.curator.framework.api.ACLPathAndBytesable; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.BackgroundPathAndBytesable; +import org.apache.curator.framework.api.BackgroundPathable; +import org.apache.curator.framework.api.BackgroundVersionable; +import org.apache.curator.framework.api.ChildrenDeletable; +import org.apache.curator.framework.api.CreateBackgroundModeACLable; +import org.apache.curator.framework.api.CreateBuilder; +import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.DeleteBuilder; +import org.apache.curator.framework.api.ErrorListenerPathAndBytesable; +import org.apache.curator.framework.api.ErrorListenerPathable; +import org.apache.curator.framework.api.ExistsBuilder; +import org.apache.curator.framework.api.ExistsBuilderMain; +import org.apache.curator.framework.api.GetACLBuilder; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.api.GetDataBuilder; +import org.apache.curator.framework.api.GetDataWatchBackgroundStatable; +import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.api.Pathable; +import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable; +import org.apache.curator.framework.api.SetACLBuilder; +import org.apache.curator.framework.api.SetDataBackgroundVersionable; +import org.apache.curator.framework.api.SetDataBuilder; +import org.apache.curator.framework.api.SyncBuilder; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.api.VersionPathAndBytesable; +import org.apache.curator.framework.api.WatchPathable; +import org.apache.curator.framework.api.Watchable; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransactionBridge; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.framework.api.transaction.TransactionCheckBuilder; +import org.apache.curator.framework.api.transaction.TransactionCreateBuilder; +import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder; +import org.apache.curator.framework.api.transaction.TransactionSetDataBuilder; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.recipes.atomic.AtomicStats; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryForever; +import org.apache.curator.utils.EnsurePath; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A mock implementation of{@link CuratorFramework} for testing purposes. + * + * @author mpolden + */ +public class MockCuratorFramework implements CuratorFramework { + + private final boolean shouldTimeoutOnEnter; + private final boolean stableOrdering; + private final Locks locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS); + + /** The file system used by this mock to store zookeeper files and directories */ + private final MemoryFileSystem fileSystem = new MemoryFileSystem(); + + /** Atomic counters. A more accurate mock would store these as files in the file system */ + private final Map atomicCounters = new ConcurrentHashMap<>(); + + /** Listeners to changes to a particular path */ + private final ListenerMap listeners = new ListenerMap(); + + private CuratorFrameworkState curatorState = CuratorFrameworkState.LATENT; + private int monotonicallyIncreasingNumber = 0; + + public MockCuratorFramework(boolean stableOrdering, boolean shouldTimeoutOnEnter) { + this.stableOrdering = stableOrdering; + this.shouldTimeoutOnEnter = shouldTimeoutOnEnter; + } + + public Map atomicCounters() { + return Collections.unmodifiableMap(atomicCounters); + } + + public MemoryFileSystem fileSystem() { + return fileSystem; + } + + @Override + public void start() { + curatorState = CuratorFrameworkState.STARTED; + } + + @Override + public void close() { + curatorState = CuratorFrameworkState.STOPPED; + } + + @Override + public CuratorFrameworkState getState() { + return curatorState; + } + + @Override + @Deprecated + public boolean isStarted() { + return curatorState == CuratorFrameworkState.STARTED; + } + + @Override + public CreateBuilder create() { + return new MockCreateBuilder(); + } + + @Override + public DeleteBuilder delete() { + return new MockDeleteBuilder(); + } + + @Override + public ExistsBuilder checkExists() { + return new MockExistsBuilder(); + } + + @Override + public GetDataBuilder getData() { + return new MockGetDataBuilder(); + } + + @Override + public SetDataBuilder setData() { + return new MockSetDataBuilder(); + } + + @Override + public GetChildrenBuilder getChildren() { + return new MockGetChildrenBuilder(); + } + + @Override + public GetACLBuilder getACL() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public SetACLBuilder setACL() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorTransaction inTransaction() { + return new MockCuratorTransactionFinal(); + } + + @Override + @Deprecated + public void sync(String path, Object backgroundContextObject) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public void createContainers(String s) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public Listenable getConnectionStateListenable() { + return new MockListenable<>(); + } + + @Override + public Listenable getCuratorListenable() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public Listenable getUnhandledErrorListenable() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + @Deprecated + public CuratorFramework nonNamespaceView() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorFramework usingNamespace(String newNamespace) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public String getNamespace() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorZookeeperClient getZookeeperClient() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Deprecated + @Override + public EnsurePath newNamespaceAwareEnsurePath(String path) { + return new EnsurePath(path); + } + + @Override + public void clearWatcherReferences(Watcher watcher) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public boolean blockUntilConnected(int i, TimeUnit timeUnit) { + return true; + } + + @Override + public void blockUntilConnected() { + } + + @Override + public SyncBuilder sync() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + // ----- Factory methods for mocks */ + + public InterProcessLock createMutex(String path) { + return new MockCuratorFramework.MockLock(path); + } + + public MockAtomicCounter createAtomicCounter(String path) { + return atomicCounters.computeIfAbsent(path, (k) -> new MockAtomicCounter(path)); + } + + public Curator.CompletionWaiter createCompletionWaiter() { + return new MockCuratorFramework.MockCompletionWaiter(); + } + + public Curator.DirectoryCache createDirectoryCache(String path) { + return new MockDirectoryCache(Path.fromString(path)); + } + + public Curator.FileCache createFileCache(String path) { + return new MockFileCache(Path.fromString(path)); + } + + // ----- Start of adaptor methods from Curator to the mock file system ----- + + /** Creates a node below the given directory root */ + private String createNode(String pathString, byte[] content, boolean createParents, CreateMode createMode, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException.NodeExistsException, KeeperException.NoNodeException { + validatePath(pathString); + Path path = Path.fromString(pathString); + if (path.isRoot()) return "/"; // the root already exists + MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), createParents); + String name = nodeName(path.getName(), createMode); + + if (parent == null) + throw new KeeperException.NoNodeException(path.getParentPath().toString()); + if (parent.children().containsKey(path.getName())) + throw new KeeperException.NodeExistsException(path.toString()); + + parent.add(name).setContent(content); + String nodePath = "/" + path.getParentPath().toString() + "/" + name; + listeners.notify(Path.fromString(nodePath), content, PathChildrenCacheEvent.Type.CHILD_ADDED); + return nodePath; + } + + /** Deletes a node below the given directory root */ + private void deleteNode(String pathString, boolean deleteChildren, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException.NoNodeException, KeeperException.NotEmptyException { + validatePath(pathString); + Path path = Path.fromString(pathString); + MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); + if (parent == null) throw new KeeperException.NoNodeException(path.toString()); + MemoryFileSystem.Node node = parent.children().get(path.getName()); + if (node == null) throw new KeeperException.NoNodeException(path.getName() + " under " + parent); + if ( ! node.children().isEmpty() && ! deleteChildren) + throw new KeeperException.NotEmptyException(path.toString()); + parent.remove(path.getName()); + listeners.notify(path, new byte[0], PathChildrenCacheEvent.Type.CHILD_REMOVED); + } + + /** Returns the data of a node */ + private byte[] getData(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + validatePath(pathString); + return getNode(pathString, root).getContent(); + } + + /** sets the data of an existing node */ + private void setData(String pathString, byte[] content, MemoryFileSystem.Node root, Listeners listeners) + throws KeeperException.NoNodeException { + validatePath(pathString); + getNode(pathString, root).setContent(content); + listeners.notify(Path.fromString(pathString), content, PathChildrenCacheEvent.Type.CHILD_UPDATED); + } + + private List getChildren(String path, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + validatePath(path); + MemoryFileSystem.Node node = root.getNode(Paths.get(path), false); + if (node == null) throw new KeeperException.NoNodeException(path); + List children = new ArrayList<>(node.children().keySet()); + if (! stableOrdering) + Collections.shuffle(children); + return children; + } + + /** Returns a node or throws the appropriate exception if it doesn't exist */ + private MemoryFileSystem.Node getNode(String pathString, MemoryFileSystem.Node root) throws KeeperException.NoNodeException { + validatePath(pathString); + Path path = Path.fromString(pathString); + MemoryFileSystem.Node parent = root.getNode(Paths.get(path.getParentPath().toString()), false); + if (parent == null) throw new KeeperException.NoNodeException(path.toString()); + MemoryFileSystem.Node node = parent.children().get(path.getName()); + if (node == null) throw new KeeperException.NoNodeException(path.toString()); + return node; + } + + private String nodeName(String baseName, CreateMode createMode) { + switch (createMode) { + case PERSISTENT: case EPHEMERAL: return baseName; + case PERSISTENT_SEQUENTIAL: case EPHEMERAL_SEQUENTIAL: return baseName + monotonicallyIncreasingNumber++; + default: throw new UnsupportedOperationException(createMode + " support not implemented in MockCurator"); + } + } + + /** Validates a path using the same rules as ZooKeeper */ + public static String validatePath(String path) throws IllegalArgumentException { + if (path == null) throw new IllegalArgumentException("Path cannot be null"); + if (path.length() == 0) throw new IllegalArgumentException("Path length must be > 0"); + if (path.charAt(0) != '/') throw new IllegalArgumentException("Path must start with / character"); + if (path.length() == 1) return path; // done checking - it's the root + if (path.charAt(path.length() - 1) == '/') + throw new IllegalArgumentException("Path must not end with / character"); + + String reason = null; + char lastc = '/'; + char[] chars = path.toCharArray(); + char c; + for (int i = 1; i < chars.length; lastc = chars[i], i++) { + c = chars[i]; + + if (c == 0) { + reason = "null character not allowed @" + i; + break; + } else if (c == '/' && lastc == '/') { + reason = "empty node name specified @" + i; + break; + } else if (c == '.' && lastc == '.') { + if (chars[i-2] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { + reason = "relative paths not allowed @" + i; + break; + } + } else if (c == '.') { + if (chars[i-1] == '/' && ((i + 1 == chars.length) || chars[i+1] == '/')) { + reason = "relative paths not allowed @" + i; + break; + } + } else if (c > '\u0000' && c < '\u001f' || c > '\u007f' && c < '\u009F' + || c > '\ud800' && c < '\uf8ff' || c > '\ufff0' && c < '\uffff') { + reason = "invalid charater @" + i; + break; + } + } + + if (reason != null) + throw new IllegalArgumentException("Invalid path string \"" + path + "\" caused by " + reason); + return path; + } + + /** + * Invocation of changes to the file system state is abstracted through this to allow transactional + * changes to notify on commit + */ + private abstract static class Listeners { + + /** Translating method */ + public final void notify(Path path, byte[] data, PathChildrenCacheEvent.Type type) { + String pathString = "/" + path.toString(); // this silly path class strips the leading "/" :-/ + PathChildrenCacheEvent event = new PathChildrenCacheEvent(type, new ChildData(pathString, null, data)); + notify(path, event); + } + + public abstract void notify(Path path, PathChildrenCacheEvent event); + + } + + /** The regular listener implementation which notifies registered file and directory listeners */ + private class ListenerMap extends Listeners { + + private final Map directoryListeners = new ConcurrentHashMap<>(); + private final Map fileListeners = new ConcurrentHashMap<>(); + + public void add(Path path, PathChildrenCacheListener listener) { + directoryListeners.put(path, listener); + } + + public void add(Path path, NodeCacheListener listener) { + fileListeners.put(path, listener); + } + + @Override + public void notify(Path path, PathChildrenCacheEvent event) { + try { + // Snapshot directoryListeners in case notification leads to new directoryListeners added + Set> directoryListenerSnapshot = new HashSet<>(directoryListeners.entrySet()); + for (Map.Entry listener : directoryListenerSnapshot) { + if (path.isChildOf(listener.getKey())) + listener.getValue().childEvent(MockCuratorFramework.this, event); + } + + // Snapshot directoryListeners in case notification leads to new directoryListeners added + Set> fileListenerSnapshot = new HashSet<>(fileListeners.entrySet()); + for (Map.Entry listener : fileListenerSnapshot) { + if (path.equals(listener.getKey())) + listener.getValue().nodeChanged(); + } + } + catch (Exception e) { + e.printStackTrace(); // TODO: Remove + throw new RuntimeException("Exception notifying listeners", e); + } + } + + } + + private class MockCompletionWaiter implements Curator.CompletionWaiter { + + @Override + public void awaitCompletion(Duration timeout) { + if (shouldTimeoutOnEnter) { + throw new CompletionTimeoutException(""); + } + } + + @Override + public void notifyCompletion() { + } + + } + + /** A lock which works inside a single vm */ + private class MockLock extends InterProcessSemaphoreMutex { + + public boolean timeoutOnLock = false; + public boolean throwExceptionOnLock = false; + + private final String path; + + private Lock lock = null; + + public MockLock(String path) { + super(MockCuratorFramework.this, path); + this.path = path; + } + + @Override + public boolean acquire(long timeout, TimeUnit unit) { + if (throwExceptionOnLock) + throw new CuratorLockException("Thrown by mock"); + if (timeoutOnLock) return false; + + try { + lock = locks.lock(path, timeout, unit); + return true; + } + catch (UncheckedTimeoutException e) { + return false; + } + } + + @Override + public void acquire() { + if (throwExceptionOnLock) + throw new CuratorLockException("Thrown by mock"); + + lock = locks.lock(path); + } + + @Override + public void release() { + if (lock != null) + lock.close(); + } + + } + + private class MockAtomicCounter extends DistributedAtomicLong { + + private boolean initialized = false; + private MockLongValue value = new MockLongValue(0); // yes, uninitialized returns 0 :-/ + + public MockAtomicCounter(String path) { + super(MockCuratorFramework.this, path, new RetryForever(1_000)); + } + + @Override + public boolean initialize(Long value) { + if (initialized) return false; + this.value = new MockLongValue(value); + initialized = true; + return true; + } + + @Override + public AtomicValue get() { + if (value == null) return new MockLongValue(0); + return value; + } + + public AtomicValue add(Long delta) { + return trySet(value.postValue() + delta); + } + + public AtomicValue subtract(Long delta) { + return trySet(value.postValue() - delta); + } + + @Override + public AtomicValue increment() { + return trySet(value.postValue() + 1); + } + + public AtomicValue decrement() { + return trySet(value.postValue() - 1); + } + + @Override + public AtomicValue trySet(Long longval) { + value = new MockLongValue(longval); + return value; + } + + public void forceSet(Long newValue) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public AtomicValue compareAndSet(Long expectedValue, Long newValue) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + } + + private static class MockLongValue implements AtomicValue { + + private final AtomicLong value = new AtomicLong(); + + public MockLongValue(long value) { + this.value.set(value); + } + + @Override + public boolean succeeded() { + return true; + } + + public void setValue(long value) { + this.value.set(value); + } + + @Override + public Long preValue() { + return value.get(); + } + + @Override + public Long postValue() { + return value.get(); + } + + @Override + public AtomicStats getStats() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + } + + private class MockDirectoryCache implements Curator.DirectoryCache { + + /** The path this is caching and listening to */ + private final Path path; + + public MockDirectoryCache(Path path) { + this.path = path; + } + + @Override + public void start() {} + + @Override + public void addListener(PathChildrenCacheListener listener) { + listeners.add(path, listener); + } + + @Override + public List getCurrentData() { + List childData = new ArrayList<>(); + for (String childName : getChildren(path)) { + Path childPath = path.append(childName); + childData.add(new ChildData(childPath.getAbsolute(), null, getData(childPath).get())); + } + return childData; + } + + @Override + public ChildData getCurrentData(Path fullPath) { + if (!fullPath.getParentPath().equals(path)) { + throw new IllegalArgumentException("Path '" + fullPath + "' is not a child path of '" + path + "'"); + } + + return getData(fullPath).map(bytes -> new ChildData(fullPath.getAbsolute(), null, bytes)).orElse(null); + } + + @Override + public void close() {} + + private List getChildren(Path path) { + try { + return MockCuratorFramework.this.getChildren().forPath(path.getAbsolute()); + } catch (KeeperException.NoNodeException e) { + return List.of(); + } catch (Exception e) { + throw new RuntimeException("Could not get children of " + path.getAbsolute(), e); + } + } + + private Optional getData(Path path) { + try { + return Optional.of(MockCuratorFramework.this.getData().forPath(path.getAbsolute())); + } + catch (KeeperException.NoNodeException e) { + return Optional.empty(); + } + catch (Exception e) { + throw new RuntimeException("Could not get data at " + path.getAbsolute(), e); + } + } + + } + + private class MockFileCache implements Curator.FileCache { + + /** The path this is caching and listening to */ + private final Path path; + + public MockFileCache(Path path) { + this.path = path; + } + + @Override + public void start() {} + + @Override + public void addListener(NodeCacheListener listener) { + listeners.add(path, listener); + } + + @Override + public ChildData getCurrentData() { + MemoryFileSystem.Node node = fileSystem.root().getNode(Paths.get(path.toString()), false); + if (node == null) return null; + return new ChildData("/" + path.toString(), null, node.getContent()); + } + + @Override + public void close() {} + + } + + // ----- The rest of this file is adapting the Curator (non-recipe) API to the ----- + // ----- file system methods above. ----- + // ----- There's nothing to see unless you are interested in an illustration of ----- + // ----- the folly of fluent API's or, more generally, mankind. ----- + + private abstract static class MockBackgroundACLPathAndBytesableBuilder implements PathAndBytesable, ProtectACLCreateModePathAndBytesable { + + public BackgroundPathAndBytesable withACL(List list) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public ACLBackgroundPathAndBytesable withMode(CreateMode createMode) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ACLCreateModeBackgroundPathAndBytesable withProtection() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public T forPath(String s, byte[] bytes) throws Exception { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public T forPath(String s) throws Exception { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + } + + private class MockCreateBuilder extends MockBackgroundACLPathAndBytesableBuilder implements CreateBuilder { + + private boolean createParents = false; + private CreateMode createMode = CreateMode.PERSISTENT; + + @Override + public ProtectACLCreateModePathAndBytesable creatingParentsIfNeeded() { + createParents = true; + return this; + } + + @Override + public ACLCreateModeBackgroundPathAndBytesable withProtection() { + // Protection against the server crashing after creating the file but before returning to the client. + // Not relevant for an in-memory mock, obviously + return this; + } + + public ACLBackgroundPathAndBytesable withMode(CreateMode createMode) { + this.createMode = createMode; + return this; + } + + @Override + public CreateBackgroundModeACLable compressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ProtectACLCreateModePathAndBytesable creatingParentContainersIfNeeded() { + // TODO: Add proper support for container nodes, see https://issues.apache.org/jira/browse/ZOOKEEPER-2163. + return creatingParentsIfNeeded(); + } + + @Override + @Deprecated + public ACLPathAndBytesable withProtectedEphemeralSequential() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public String forPath(String s) throws Exception { + return createNode(s, new byte[0], createParents, createMode, fileSystem.root(), listeners); + } + + public String forPath(String s, byte[] bytes) throws Exception { + return createNode(s, bytes, createParents, createMode, fileSystem.root(), listeners); + } + + @Override + public ErrorListenerPathAndBytesable inBackground() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + private static class MockBackgroundPathableBuilder implements BackgroundPathable, Watchable> { + + @Override + public ErrorListenerPathable inBackground() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable inBackground(Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback, Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathable inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public T forPath(String s) throws Exception { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathable watched() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathable usingWatcher(Watcher watcher) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathable usingWatcher(CuratorWatcher curatorWatcher) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + private class MockGetChildrenBuilder extends MockBackgroundPathableBuilder> implements GetChildrenBuilder { + + @Override + public WatchPathable> storingStatIn(Stat stat) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public List forPath(String path) throws Exception { + return getChildren(path, fileSystem.root()); + } + + } + + private class MockExistsBuilder extends MockBackgroundPathableBuilder implements ExistsBuilder { + + @Override + public Stat forPath(String path) { + try { + MemoryFileSystem.Node node = getNode(path, fileSystem.root()); + Stat stat = new Stat(); + stat.setVersion(node.version()); + return stat; + } + catch (KeeperException.NoNodeException e) { + return null; + } + } + + @Override + public ExistsBuilderMain creatingParentContainersIfNeeded() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + private class MockDeleteBuilder extends MockBackgroundPathableBuilder implements DeleteBuilder { + + private boolean deleteChildren = false; + + @Override + public BackgroundVersionable deletingChildrenIfNeeded() { + deleteChildren = true; + return this; + } + + @Override + public ChildrenDeletable guaranteed() { + return this; + } + + @Override + public BackgroundPathable withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public Void forPath(String pathString) throws Exception { + deleteNode(pathString, deleteChildren, fileSystem.root(), listeners); + return null; + } + + } + + private class MockGetDataBuilder extends MockBackgroundPathableBuilder implements GetDataBuilder { + + @Override + public GetDataWatchBackgroundStatable decompressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public WatchPathable storingStatIn(Stat stat) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + public byte[] forPath(String path) throws Exception { + return getData(path, fileSystem.root()); + } + + } + + private class MockSetDataBuilder extends MockBackgroundACLPathAndBytesableBuilder implements SetDataBuilder { + + @Override + public SetDataBackgroundVersionable compressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public BackgroundPathAndBytesable withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public Stat forPath(String path, byte[] bytes) throws Exception { + setData(path, bytes, fileSystem.root(), listeners); + return null; + } + + @Override + public ErrorListenerPathAndBytesable inBackground() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ErrorListenerPathAndBytesable inBackground(BackgroundCallback backgroundCallback, Object o, Executor executor) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + } + + /** Allows addition of directoryListeners which are never called */ + private static class MockListenable implements Listenable { + + @Override + public void addListener(T t) { + } + + @Override + public void addListener(T t, Executor executor) { + } + + @Override + public void removeListener(T t) { + } + + } + + private class MockCuratorTransactionFinal implements CuratorTransactionFinal { + + /** The new directory root in which the transactional changes are made */ + private final MemoryFileSystem.Node newRoot; + + private boolean committed = false; + + private final MockCuratorTransactionFinal.DelayedListener delayedListener = new MockCuratorTransactionFinal.DelayedListener(); + + public MockCuratorTransactionFinal() { + newRoot = fileSystem.root().clone(); + } + + @Override + public Collection commit() { + fileSystem.replaceRoot(newRoot); + committed = true; + delayedListener.commit(); + return null; // TODO + } + + @Override + public TransactionCreateBuilder create() { + ensureNotCommitted(); + return new MockCuratorTransactionFinal.MockTransactionCreateBuilder(); + } + + @Override + public TransactionDeleteBuilder delete() { + ensureNotCommitted(); + return new MockCuratorTransactionFinal.MockTransactionDeleteBuilder(); + } + + @Override + public TransactionSetDataBuilder setData() { + ensureNotCommitted(); + return new MockCuratorTransactionFinal.MockTransactionSetDataBuilder(); + } + + @Override + public TransactionCheckBuilder check() { + ensureNotCommitted(); + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + private void ensureNotCommitted() { + if (committed) throw new IllegalStateException("transaction already committed"); + } + + private class MockTransactionCreateBuilder implements TransactionCreateBuilder { + + private CreateMode createMode = CreateMode.PERSISTENT; + + @Override + public PathAndBytesable withACL(List list) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ACLCreateModePathAndBytesable compressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public ACLPathAndBytesable withMode(CreateMode createMode) { + this.createMode = createMode; + return this; + } + + @Override + public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { + createNode(s, bytes, false, createMode, newRoot, delayedListener); + return new MockCuratorTransactionFinal.MockCuratorTransactionBridge(); + } + + @Override + public CuratorTransactionBridge forPath(String s) throws Exception { + createNode(s, new byte[0], false, createMode, newRoot, delayedListener); + return new MockCuratorTransactionFinal.MockCuratorTransactionBridge(); + } + + } + + private class MockTransactionDeleteBuilder implements TransactionDeleteBuilder { + + @Override + public Pathable withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorTransactionBridge forPath(String path) throws Exception { + deleteNode(path, false, newRoot, delayedListener); + return new MockCuratorTransactionFinal.MockCuratorTransactionBridge(); + } + + } + + private class MockTransactionSetDataBuilder implements TransactionSetDataBuilder { + + @Override + public VersionPathAndBytesable compressed() { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public PathAndBytesable withVersion(int i) { + throw new UnsupportedOperationException("Not implemented in MockCurator"); + } + + @Override + public CuratorTransactionBridge forPath(String s, byte[] bytes) throws Exception { + MockCuratorFramework.this.setData(s, bytes, newRoot, delayedListener); + return new MockCuratorTransactionFinal.MockCuratorTransactionBridge(); + } + + @Override + public CuratorTransactionBridge forPath(String s) throws Exception { + MockCuratorFramework.this.setData(s, new byte[0], newRoot, delayedListener); + return new MockCuratorTransactionFinal.MockCuratorTransactionBridge(); + } + + } + + private class MockCuratorTransactionBridge implements CuratorTransactionBridge { + + @Override + public CuratorTransactionFinal and() { + return MockCuratorTransactionFinal.this; + } + + } + + /** A class which collects listen events and forwards them to the regular directoryListeners on commit */ + private class DelayedListener extends Listeners { + + private final List> events = new ArrayList<>(); + + @Override + public void notify(Path path, PathChildrenCacheEvent event) { + events.add(new Pair<>(path, event)); + } + + public void commit() { + for (Pair event : events) + listeners.notify(event.getFirst(), event.getSecond()); + } + + } + + } + +} -- cgit v1.2.3