diff options
author | Martin Polden <mpolden@mpolden.no> | 2020-11-20 10:34:51 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2020-11-20 10:47:33 +0100 |
commit | 02371ebcb2754ce99ee8182a870ac00ccff3f97e (patch) | |
tree | 715c304caf54a76defde98d9489a58a77321d214 /zkfacade | |
parent | 401e4d935f36fb6fa2dcbb6155a9a472c2561c51 (diff) |
Extract ConnectionSpec
Diffstat (limited to 'zkfacade')
5 files changed, 207 insertions, 91 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java new file mode 100644 index 00000000000..4409291419a --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java @@ -0,0 +1,102 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator; + +import com.yahoo.net.HostName; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** + * A connection spec for Curator. + * + * @author mpolden + */ +class ConnectionSpec { + + private final String local; + private final String ensemble; + private final int ensembleSize; + + private ConnectionSpec(String local, String ensemble, int ensembleSize) { + this.local = requireNonEmpty(local, "local spec"); + this.ensemble = requireNonEmpty(ensemble, "ensemble spec"); + this.ensembleSize = ensembleSize; + } + + /** Returns the local spec. This may be a subset of the ensemble spec */ + public String local() { + return local; + } + + /** Returns the ensemble spec. This always contains all nodes in the ensemble */ + public String ensemble() { + return ensemble; + } + + /** Returns the number of servers in the ensemble */ + public int ensembleSize() { + return ensembleSize; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConnectionSpec that = (ConnectionSpec) o; + return ensembleSize == that.ensembleSize && + local.equals(that.local) && + ensemble.equals(that.ensemble); + } + + @Override + public int hashCode() { + return Objects.hash(local, ensemble, ensembleSize); + } + + public static ConnectionSpec create(String spec) { + return create(spec, spec); + } + + public static ConnectionSpec create(String localSpec, String ensembleSpec) { + return new ConnectionSpec(localSpec, ensembleSpec, ensembleSpec.split(",").length); + } + + public static <T> ConnectionSpec create(List<T> servers, + Function<T, String> hostnameGetter, + Function<T, Integer> portGetter, + boolean localhostAffinity) { + String localSpec = createSpec(servers, hostnameGetter, portGetter, localhostAffinity); + String ensembleSpec = localhostAffinity ? createSpec(servers, hostnameGetter, portGetter, false) : localSpec; + return new ConnectionSpec(localSpec, ensembleSpec, servers.size()); + } + + private static <T> String createSpec(List<T> servers, + Function<T, String> hostnameGetter, + Function<T, Integer> portGetter, + boolean localhostAffinity) { + String thisServer = HostName.getLocalhost(); + StringBuilder connectionSpec = new StringBuilder(); + for (var server : servers) { + if (localhostAffinity && !thisServer.equals(hostnameGetter.apply(server))) continue; + connectionSpec.append(hostnameGetter.apply(server)); + connectionSpec.append(':'); + connectionSpec.append(portGetter.apply(server)); + connectionSpec.append(','); + } + if (localhostAffinity && connectionSpec.length() == 0) { + throw new IllegalArgumentException("Unable to create connect string to localhost: " + + "There is no localhost server specified in config"); + } + if (connectionSpec.length() > 0) { + connectionSpec.setLength(connectionSpec.length() - 1); // Remove trailing comma + } + return connectionSpec.toString(); + } + + private static String requireNonEmpty(String s, String field) { + if (Objects.requireNonNull(s).isEmpty()) throw new IllegalArgumentException(field + " must be non-empty"); + return s; + } + +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 6cbfa274c56..4aaae38f939 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -4,7 +4,6 @@ package com.yahoo.vespa.curator; import com.google.inject.Inject; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.io.IOUtils; -import com.yahoo.net.HostName; import com.yahoo.path.Path; import com.yahoo.text.Utf8; import com.yahoo.vespa.curator.recipes.CuratorCounter; @@ -55,81 +54,71 @@ public class Curator implements AutoCloseable { private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30); private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1); private static final int MAX_RETRIES = 10; + private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES); protected final RetryPolicy retryPolicy; private final CuratorFramework curatorFramework; - private final String connectionSpec; // May be a subset of the servers in the ensemble - private final String zooKeeperEnsembleConnectionSpec; - private final int zooKeeperEnsembleCount; + private final ConnectionSpec connectionSpec; // All lock keys, to allow re-entrancy. This will grow forever, but this should be too slow to be a problem private final ConcurrentHashMap<Path, Lock> locks = new ConcurrentHashMap<>(); /** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */ public static Curator create(String connectionSpec) { - return new Curator(connectionSpec, connectionSpec, Optional.of(ZK_CLIENT_CONFIG_FILE)); + return new Curator(ConnectionSpec.create(connectionSpec), Optional.of(ZK_CLIENT_CONFIG_FILE)); } // For testing only, use Optional.empty for clientConfigFile parameter to create default zookeeper client config public static Curator create(String connectionSpec, Optional<File> clientConfigFile) { - return new Curator(connectionSpec, connectionSpec, clientConfigFile); + return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile); } // Depend on ZooKeeperServer to make sure it is started first - // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well) + // TODO: This can be removed when this package is no longer public API. @Inject - public Curator(ConfigserverConfig configserverConfig, VespaZooKeeperServer server) { + public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE)); } Curator(ConfigserverConfig configserverConfig, Optional<File> clientConfigFile) { - this(createConnectionSpec(configserverConfig), createEnsembleConnectionSpec(configserverConfig), clientConfigFile); - } - - private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Optional<File> clientConfigFile) { - this(connectionSpec, - zooKeeperEnsembleConnectionSpec, - (retryPolicy) -> CuratorFrameworkFactory - .builder() - .retryPolicy(retryPolicy) - .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) - .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) - .connectString(connectionSpec) - .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) - .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 - .build()); + this(ConnectionSpec.create(configserverConfig.zookeeperserver(), + ConfigserverConfig.Zookeeperserver::hostname, + ConfigserverConfig.Zookeeperserver::port, + configserverConfig.zookeeperLocalhostAffinity()), + clientConfigFile); } protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function<RetryPolicy, CuratorFramework> curatorFactory) { - this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory, - new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES)); + this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory, DEFAULT_RETRY_POLICY); } - private Curator(String connectionSpec, - String zooKeeperEnsembleConnectionSpec, + private Curator(ConnectionSpec connectionSpec, Optional<File> clientConfigFile) { + this(connectionSpec, + (retryPolicy) -> CuratorFrameworkFactory + .builder() + .retryPolicy(retryPolicy) + .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) + .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) + .connectString(connectionSpec.local()) + .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) + .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 + .build(), + DEFAULT_RETRY_POLICY); + } + + private Curator(ConnectionSpec connectionSpec, Function<RetryPolicy, CuratorFramework> curatorFactory, RetryPolicy retryPolicy) { this.connectionSpec = connectionSpec; this.retryPolicy = retryPolicy; this.curatorFramework = curatorFactory.apply(retryPolicy); if (this.curatorFramework != null) { - validateConnectionSpec(connectionSpec); - validateConnectionSpec(zooKeeperEnsembleConnectionSpec); addLoggingListener(); curatorFramework.start(); } - - this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec; - this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length; - } - - private static String createConnectionSpec(ConfigserverConfig configserverConfig) { - return configserverConfig.zookeeperLocalhostAffinity() - ? createConnectionSpecForLocalhost(configserverConfig) - : createEnsembleConnectionSpec(configserverConfig); } private static ZKClientConfig createClientConfig(Optional<File> clientConfigFile) { @@ -148,39 +137,6 @@ public class Curator implements AutoCloseable { } } - private static String createEnsembleConnectionSpec(ConfigserverConfig config) { - StringBuilder connectionSpec = new StringBuilder(); - for (int i = 0; i < config.zookeeperserver().size(); i++) { - if (connectionSpec.length() > 0) { - connectionSpec.append(','); - } - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - connectionSpec.append(server.hostname()); - connectionSpec.append(':'); - connectionSpec.append(server.port()); - } - return connectionSpec.toString(); - } - - static String createConnectionSpecForLocalhost(ConfigserverConfig config) { - String thisServer = HostName.getLocalhost(); - - for (int i = 0; i < config.zookeeperserver().size(); i++) { - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - if (thisServer.equals(server.hostname())) { - return String.format("%s:%d", server.hostname(), server.port()); - } - } - - throw new IllegalArgumentException("Unable to create connect string to localhost: " + - "There is no localhost server specified in config: " + config); - } - - private static void validateConnectionSpec(String connectionSpec) { - if (connectionSpec == null || connectionSpec.isEmpty()) - throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec)); - } - /** * Returns the ZooKeeper "connect string" used by curator: a comma-separated list of * host:port of ZooKeeper endpoints to connect to. This may be a subset of @@ -189,7 +145,7 @@ public class Curator implements AutoCloseable { * * This may be empty but never null */ - public String connectionSpec() { return connectionSpec; } + public String connectionSpec() { return connectionSpec.local(); } /** For internal use; prefer creating a {@link CuratorCounter} */ public DistributedAtomicLong createAtomicCounter(String path) { @@ -432,7 +388,7 @@ public class Curator implements AutoCloseable { * TODO: Move method out of this class. */ public String zooKeeperEnsembleConnectionSpec() { - return zooKeeperEnsembleConnectionSpec; + return connectionSpec.ensemble(); } /** @@ -440,7 +396,7 @@ public class Curator implements AutoCloseable { * WARNING: This may be different from the number of servers this Curator may connect to. * TODO: Move method out of this class. */ - public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; } + public int zooKeeperEnsembleCount() { return connectionSpec.ensembleSize(); } private static Optional<String> getEnvironmentVariable(String variableName) { return Optional.ofNullable(System.getenv().get(variableName)) diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java index 3da7678c44e..8e3b433354d 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java @@ -137,7 +137,7 @@ public class MockCurator extends Curator { * This is not what ZooKeeper does. */ public MockCurator(boolean stableOrdering) { - super("", "", (retryPolicy) -> null); + super("host1:10001", "host1:10001", (retryPolicy) -> null); this.stableOrdering = stableOrdering; curatorFramework = new MockCuratorFramework(); curatorFramework.start(); diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java new file mode 100644 index 00000000000..a518d8df843 --- /dev/null +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java @@ -0,0 +1,74 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.curator; + +import com.yahoo.net.HostName; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * @author mpolden + */ +public class ConnectionSpecTest { + + @Test + public void create() { + HostName.setHostNameForTestingOnly("host2"); + Config config = new Config(List.of(new Config.Server("host1", 10001), + new Config.Server("host2", 10002), + new Config.Server("host3", 10003))); + + { + ConnectionSpec spec = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, false); + assertEquals("host1:10001,host2:10002,host3:10003", spec.local()); + assertEquals("host1:10001,host2:10002,host3:10003", spec.ensemble()); + assertEquals(3, spec.ensembleSize()); + } + + { + ConnectionSpec specLocalAffinity = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, true); + assertEquals("host2:10002", specLocalAffinity.local()); + assertEquals("host1:10001,host2:10002,host3:10003", specLocalAffinity.ensemble()); + assertEquals(3, specLocalAffinity.ensembleSize()); + } + + { + ConnectionSpec specFromString = ConnectionSpec.create("host1:10001", "host1:10001,host2:10002"); + assertEquals("host1:10001", specFromString.local()); + assertEquals("host1:10001,host2:10002", specFromString.ensemble()); + assertEquals(2, specFromString.ensembleSize()); + } + } + + private static class Config { + + private final List<Server> servers; + + public Config(List<Server> servers) { + this.servers = servers; + } + + private static class Server { + + private final String hostname; + private final int port; + + public Server(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public String hostname() { + return hostname; + } + + public int port() { + return port; + } + } + + } + +} diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java index 2bf40c4e2bb..1c7cb3695a8 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java @@ -68,22 +68,6 @@ public class CuratorTest { } } - @Test - public void localhost_affinity() { - String localhostHostName = "myhost"; - int localhostPort = 123; - - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhostHostName, localhostPort)); - builder.zookeeperserver(createZKBuilder("otherhost", 345)); - ConfigserverConfig config = new ConfigserverConfig(builder); - - HostName.setHostNameForTestingOnly(localhostHostName); - - String localhostSpec = localhostHostName + ":" + localhostPort; - assertEquals(localhostSpec, Curator.createConnectionSpecForLocalhost(config)); - } - private ConfigserverConfig createTestConfig() { ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); builder.zookeeperserver(createZKBuilder(localhost, port1)); |