summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2020-11-20 10:34:51 +0100
committerMartin Polden <mpolden@mpolden.no>2020-11-20 10:47:33 +0100
commit02371ebcb2754ce99ee8182a870ac00ccff3f97e (patch)
tree715c304caf54a76defde98d9489a58a77321d214 /zkfacade
parent401e4d935f36fb6fa2dcbb6155a9a472c2561c51 (diff)
Extract ConnectionSpec
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java102
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java104
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java2
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java74
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java16
5 files changed, 207 insertions, 91 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java
new file mode 100644
index 00000000000..4409291419a
--- /dev/null
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/ConnectionSpec.java
@@ -0,0 +1,102 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator;
+
+import com.yahoo.net.HostName;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A connection spec for Curator.
+ *
+ * @author mpolden
+ */
+class ConnectionSpec {
+
+ private final String local;
+ private final String ensemble;
+ private final int ensembleSize;
+
+ private ConnectionSpec(String local, String ensemble, int ensembleSize) {
+ this.local = requireNonEmpty(local, "local spec");
+ this.ensemble = requireNonEmpty(ensemble, "ensemble spec");
+ this.ensembleSize = ensembleSize;
+ }
+
+ /** Returns the local spec. This may be a subset of the ensemble spec */
+ public String local() {
+ return local;
+ }
+
+ /** Returns the ensemble spec. This always contains all nodes in the ensemble */
+ public String ensemble() {
+ return ensemble;
+ }
+
+ /** Returns the number of servers in the ensemble */
+ public int ensembleSize() {
+ return ensembleSize;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConnectionSpec that = (ConnectionSpec) o;
+ return ensembleSize == that.ensembleSize &&
+ local.equals(that.local) &&
+ ensemble.equals(that.ensemble);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(local, ensemble, ensembleSize);
+ }
+
+ public static ConnectionSpec create(String spec) {
+ return create(spec, spec);
+ }
+
+ public static ConnectionSpec create(String localSpec, String ensembleSpec) {
+ return new ConnectionSpec(localSpec, ensembleSpec, ensembleSpec.split(",").length);
+ }
+
+ public static <T> ConnectionSpec create(List<T> servers,
+ Function<T, String> hostnameGetter,
+ Function<T, Integer> portGetter,
+ boolean localhostAffinity) {
+ String localSpec = createSpec(servers, hostnameGetter, portGetter, localhostAffinity);
+ String ensembleSpec = localhostAffinity ? createSpec(servers, hostnameGetter, portGetter, false) : localSpec;
+ return new ConnectionSpec(localSpec, ensembleSpec, servers.size());
+ }
+
+ private static <T> String createSpec(List<T> servers,
+ Function<T, String> hostnameGetter,
+ Function<T, Integer> portGetter,
+ boolean localhostAffinity) {
+ String thisServer = HostName.getLocalhost();
+ StringBuilder connectionSpec = new StringBuilder();
+ for (var server : servers) {
+ if (localhostAffinity && !thisServer.equals(hostnameGetter.apply(server))) continue;
+ connectionSpec.append(hostnameGetter.apply(server));
+ connectionSpec.append(':');
+ connectionSpec.append(portGetter.apply(server));
+ connectionSpec.append(',');
+ }
+ if (localhostAffinity && connectionSpec.length() == 0) {
+ throw new IllegalArgumentException("Unable to create connect string to localhost: " +
+ "There is no localhost server specified in config");
+ }
+ if (connectionSpec.length() > 0) {
+ connectionSpec.setLength(connectionSpec.length() - 1); // Remove trailing comma
+ }
+ return connectionSpec.toString();
+ }
+
+ private static String requireNonEmpty(String s, String field) {
+ if (Objects.requireNonNull(s).isEmpty()) throw new IllegalArgumentException(field + " must be non-empty");
+ return s;
+ }
+
+}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 6cbfa274c56..4aaae38f939 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -4,7 +4,6 @@ package com.yahoo.vespa.curator;
import com.google.inject.Inject;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.io.IOUtils;
-import com.yahoo.net.HostName;
import com.yahoo.path.Path;
import com.yahoo.text.Utf8;
import com.yahoo.vespa.curator.recipes.CuratorCounter;
@@ -55,81 +54,71 @@ public class Curator implements AutoCloseable {
private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1);
private static final int MAX_RETRIES = 10;
+ private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES);
protected final RetryPolicy retryPolicy;
private final CuratorFramework curatorFramework;
- private final String connectionSpec; // May be a subset of the servers in the ensemble
- private final String zooKeeperEnsembleConnectionSpec;
- private final int zooKeeperEnsembleCount;
+ private final ConnectionSpec connectionSpec;
// All lock keys, to allow re-entrancy. This will grow forever, but this should be too slow to be a problem
private final ConcurrentHashMap<Path, Lock> locks = new ConcurrentHashMap<>();
/** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */
public static Curator create(String connectionSpec) {
- return new Curator(connectionSpec, connectionSpec, Optional.of(ZK_CLIENT_CONFIG_FILE));
+ return new Curator(ConnectionSpec.create(connectionSpec), Optional.of(ZK_CLIENT_CONFIG_FILE));
}
// For testing only, use Optional.empty for clientConfigFile parameter to create default zookeeper client config
public static Curator create(String connectionSpec, Optional<File> clientConfigFile) {
- return new Curator(connectionSpec, connectionSpec, clientConfigFile);
+ return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile);
}
// Depend on ZooKeeperServer to make sure it is started first
- // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well)
+ // TODO: This can be removed when this package is no longer public API.
@Inject
- public Curator(ConfigserverConfig configserverConfig, VespaZooKeeperServer server) {
+ public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) {
this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE));
}
Curator(ConfigserverConfig configserverConfig, Optional<File> clientConfigFile) {
- this(createConnectionSpec(configserverConfig), createEnsembleConnectionSpec(configserverConfig), clientConfigFile);
- }
-
- private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Optional<File> clientConfigFile) {
- this(connectionSpec,
- zooKeeperEnsembleConnectionSpec,
- (retryPolicy) -> CuratorFrameworkFactory
- .builder()
- .retryPolicy(retryPolicy)
- .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis())
- .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis())
- .connectString(connectionSpec)
- .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile)))
- .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8
- .build());
+ this(ConnectionSpec.create(configserverConfig.zookeeperserver(),
+ ConfigserverConfig.Zookeeperserver::hostname,
+ ConfigserverConfig.Zookeeperserver::port,
+ configserverConfig.zookeeperLocalhostAffinity()),
+ clientConfigFile);
}
protected Curator(String connectionSpec,
String zooKeeperEnsembleConnectionSpec,
Function<RetryPolicy, CuratorFramework> curatorFactory) {
- this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory,
- new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES));
+ this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory, DEFAULT_RETRY_POLICY);
}
- private Curator(String connectionSpec,
- String zooKeeperEnsembleConnectionSpec,
+ private Curator(ConnectionSpec connectionSpec, Optional<File> clientConfigFile) {
+ this(connectionSpec,
+ (retryPolicy) -> CuratorFrameworkFactory
+ .builder()
+ .retryPolicy(retryPolicy)
+ .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis())
+ .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis())
+ .connectString(connectionSpec.local())
+ .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile)))
+ .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8
+ .build(),
+ DEFAULT_RETRY_POLICY);
+ }
+
+ private Curator(ConnectionSpec connectionSpec,
Function<RetryPolicy, CuratorFramework> curatorFactory,
RetryPolicy retryPolicy) {
this.connectionSpec = connectionSpec;
this.retryPolicy = retryPolicy;
this.curatorFramework = curatorFactory.apply(retryPolicy);
if (this.curatorFramework != null) {
- validateConnectionSpec(connectionSpec);
- validateConnectionSpec(zooKeeperEnsembleConnectionSpec);
addLoggingListener();
curatorFramework.start();
}
-
- this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec;
- this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length;
- }
-
- private static String createConnectionSpec(ConfigserverConfig configserverConfig) {
- return configserverConfig.zookeeperLocalhostAffinity()
- ? createConnectionSpecForLocalhost(configserverConfig)
- : createEnsembleConnectionSpec(configserverConfig);
}
private static ZKClientConfig createClientConfig(Optional<File> clientConfigFile) {
@@ -148,39 +137,6 @@ public class Curator implements AutoCloseable {
}
}
- private static String createEnsembleConnectionSpec(ConfigserverConfig config) {
- StringBuilder connectionSpec = new StringBuilder();
- for (int i = 0; i < config.zookeeperserver().size(); i++) {
- if (connectionSpec.length() > 0) {
- connectionSpec.append(',');
- }
- ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
- connectionSpec.append(server.hostname());
- connectionSpec.append(':');
- connectionSpec.append(server.port());
- }
- return connectionSpec.toString();
- }
-
- static String createConnectionSpecForLocalhost(ConfigserverConfig config) {
- String thisServer = HostName.getLocalhost();
-
- for (int i = 0; i < config.zookeeperserver().size(); i++) {
- ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
- if (thisServer.equals(server.hostname())) {
- return String.format("%s:%d", server.hostname(), server.port());
- }
- }
-
- throw new IllegalArgumentException("Unable to create connect string to localhost: " +
- "There is no localhost server specified in config: " + config);
- }
-
- private static void validateConnectionSpec(String connectionSpec) {
- if (connectionSpec == null || connectionSpec.isEmpty())
- throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec));
- }
-
/**
* Returns the ZooKeeper "connect string" used by curator: a comma-separated list of
* host:port of ZooKeeper endpoints to connect to. This may be a subset of
@@ -189,7 +145,7 @@ public class Curator implements AutoCloseable {
*
* This may be empty but never null
*/
- public String connectionSpec() { return connectionSpec; }
+ public String connectionSpec() { return connectionSpec.local(); }
/** For internal use; prefer creating a {@link CuratorCounter} */
public DistributedAtomicLong createAtomicCounter(String path) {
@@ -432,7 +388,7 @@ public class Curator implements AutoCloseable {
* TODO: Move method out of this class.
*/
public String zooKeeperEnsembleConnectionSpec() {
- return zooKeeperEnsembleConnectionSpec;
+ return connectionSpec.ensemble();
}
/**
@@ -440,7 +396,7 @@ public class Curator implements AutoCloseable {
* WARNING: This may be different from the number of servers this Curator may connect to.
* TODO: Move method out of this class.
*/
- public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; }
+ public int zooKeeperEnsembleCount() { return connectionSpec.ensembleSize(); }
private static Optional<String> getEnvironmentVariable(String variableName) {
return Optional.ofNullable(System.getenv().get(variableName))
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
index 3da7678c44e..8e3b433354d 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
@@ -137,7 +137,7 @@ public class MockCurator extends Curator {
* This is not what ZooKeeper does.
*/
public MockCurator(boolean stableOrdering) {
- super("", "", (retryPolicy) -> null);
+ super("host1:10001", "host1:10001", (retryPolicy) -> null);
this.stableOrdering = stableOrdering;
curatorFramework = new MockCuratorFramework();
curatorFramework.start();
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java
new file mode 100644
index 00000000000..a518d8df843
--- /dev/null
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/ConnectionSpecTest.java
@@ -0,0 +1,74 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.curator;
+
+import com.yahoo.net.HostName;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author mpolden
+ */
+public class ConnectionSpecTest {
+
+ @Test
+ public void create() {
+ HostName.setHostNameForTestingOnly("host2");
+ Config config = new Config(List.of(new Config.Server("host1", 10001),
+ new Config.Server("host2", 10002),
+ new Config.Server("host3", 10003)));
+
+ {
+ ConnectionSpec spec = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, false);
+ assertEquals("host1:10001,host2:10002,host3:10003", spec.local());
+ assertEquals("host1:10001,host2:10002,host3:10003", spec.ensemble());
+ assertEquals(3, spec.ensembleSize());
+ }
+
+ {
+ ConnectionSpec specLocalAffinity = ConnectionSpec.create(config.servers, Config.Server::hostname, Config.Server::port, true);
+ assertEquals("host2:10002", specLocalAffinity.local());
+ assertEquals("host1:10001,host2:10002,host3:10003", specLocalAffinity.ensemble());
+ assertEquals(3, specLocalAffinity.ensembleSize());
+ }
+
+ {
+ ConnectionSpec specFromString = ConnectionSpec.create("host1:10001", "host1:10001,host2:10002");
+ assertEquals("host1:10001", specFromString.local());
+ assertEquals("host1:10001,host2:10002", specFromString.ensemble());
+ assertEquals(2, specFromString.ensembleSize());
+ }
+ }
+
+ private static class Config {
+
+ private final List<Server> servers;
+
+ public Config(List<Server> servers) {
+ this.servers = servers;
+ }
+
+ private static class Server {
+
+ private final String hostname;
+ private final int port;
+
+ public Server(String hostname, int port) {
+ this.hostname = hostname;
+ this.port = port;
+ }
+
+ public String hostname() {
+ return hostname;
+ }
+
+ public int port() {
+ return port;
+ }
+ }
+
+ }
+
+}
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
index 2bf40c4e2bb..1c7cb3695a8 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
@@ -68,22 +68,6 @@ public class CuratorTest {
}
}
- @Test
- public void localhost_affinity() {
- String localhostHostName = "myhost";
- int localhostPort = 123;
-
- ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
- builder.zookeeperserver(createZKBuilder(localhostHostName, localhostPort));
- builder.zookeeperserver(createZKBuilder("otherhost", 345));
- ConfigserverConfig config = new ConfigserverConfig(builder);
-
- HostName.setHostNameForTestingOnly(localhostHostName);
-
- String localhostSpec = localhostHostName + ":" + localhostPort;
- assertEquals(localhostSpec, Curator.createConnectionSpecForLocalhost(config));
- }
-
private ConfigserverConfig createTestConfig() {
ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
builder.zookeeperserver(createZKBuilder(localhost, port1));