diff options
author | Håkon Hallingstad <hakon@oath.com> | 2018-01-11 23:11:56 +0100 |
---|---|---|
committer | Håkon Hallingstad <hakon@oath.com> | 2018-01-11 23:11:56 +0100 |
commit | a6442d127d3bd311542842cdee2ee6dbba7b3629 (patch) | |
tree | 22387a253c0848efda52f3db7f00b4ed4f5d110a /zkfacade/src | |
parent | bd891f6ff6e029dca4083ca9205df8f36bda09ed (diff) |
Some Curator clients require ensemble connect string
Diffstat (limited to 'zkfacade/src')
3 files changed, 98 insertions, 84 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 4c932969460..5ba16232221 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -26,10 +26,10 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * Curator interface for Vespa. @@ -47,77 +47,85 @@ public class Curator implements AutoCloseable { private static final int ZK_SESSION_TIMEOUT = 30000; private static final int ZK_CONNECTION_TIMEOUT = 30000; - private static final int baseSleepTime = 1000; //ms - private static final int maxRetries = 10; + private static final int BASE_SLEEP_TIME = 1000; //ms + private static final int MAX_RETRIES = 10; - private final CuratorFramework curatorFramework; protected final RetryPolicy retryPolicy; - private final String connectionSpec; - private final int serverCount; + private final CuratorFramework curatorFramework; + private final String connectionSpec; // May be a subset of the servers in the ensemble + + private final String zooKeeperEnsembleConnectionSpec; + private final int zooKeeperEnsembleCount; /** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */ public static Curator create(String connectionSpec) { - return new Curator(connectionSpec); + return new Curator(connectionSpec, connectionSpec); } // Depend on ZooKeeperServer to make sure it is started first // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well) @Inject public Curator(ConfigserverConfig configserverConfig, ZooKeeperServer server) { - this(createConnectionSpec(configserverConfig)); + this(configserverConfig, createConnectionSpec(configserverConfig)); } - - static String createConnectionSpec(ConfigserverConfig config) { - String thisServer = HostName.getLocalhost(); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < config.zookeeperserver().size(); i++) { - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - - String spec = String.format("%s:%d", server.hostname(), server.port()); - if (config.zookeeperLocalhostAffinity() && server.hostname().equals(thisServer)) { - // Only connect to localhost server if possible, to save network traffic - // and balance load. - return spec; - } + private Curator(ConfigserverConfig configserverConfig, String zooKeeperEnsembleConnectionSpec) { + this(configserverConfig.zookeeperLocalhostAffinity() ? + HostName.getLocalhost() : zooKeeperEnsembleConnectionSpec, + zooKeeperEnsembleConnectionSpec); + } - if (sb.length() > 0) { - sb.append(','); - } + private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec) { + this(connectionSpec, + zooKeeperEnsembleConnectionSpec, + (retryPolicy) -> CuratorFrameworkFactory + .builder() + .retryPolicy(retryPolicy) + .sessionTimeoutMs(ZK_SESSION_TIMEOUT) + .connectionTimeoutMs(ZK_CONNECTION_TIMEOUT) + .connectString(connectionSpec) + .zookeeperFactory(new DNSResolvingFixerZooKeeperFactory(UNKNOWN_HOST_TIMEOUT_MILLIS)) + .build()); + } - sb.append(spec); - } - return sb.toString(); + protected Curator(String connectionSpec, + String zooKeeperEnsembleConnectionSpec, + Function<RetryPolicy, CuratorFramework> curatorFactory) { + this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory, + new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES)); } - /** - * Create a curator instance which connects to the zookeeper servers given by a connection spec - * on the format "hostname1:port,hostname2:port" ... - */ - public Curator(String connectionSpec) { - Objects.requireNonNull(connectionSpec, "The curator connection spec cannot be null"); + private Curator(String connectionSpec, + String zooKeeperEnsembleConnectionSpec, + Function<RetryPolicy, CuratorFramework> curatorFactory, + RetryPolicy retryPolicy) { this.connectionSpec = connectionSpec; - this.serverCount = connectionSpec.split(",").length; - validateConnectionSpec(connectionSpec); - retryPolicy = new ExponentialBackoffRetry(baseSleepTime, maxRetries); - curatorFramework = CuratorFrameworkFactory.builder() - .retryPolicy(retryPolicy) - .sessionTimeoutMs(ZK_SESSION_TIMEOUT) - .connectionTimeoutMs(ZK_CONNECTION_TIMEOUT) - .connectString(connectionSpec) - .zookeeperFactory(new DNSResolvingFixerZooKeeperFactory(UNKNOWN_HOST_TIMEOUT_MILLIS)) - .build(); - addFakeListener(); - curatorFramework.start(); + this.retryPolicy = retryPolicy; + this.curatorFramework = curatorFactory.apply(retryPolicy); + if (this.curatorFramework != null) { + validateConnectionSpec(connectionSpec); + validateConnectionSpec(zooKeeperEnsembleConnectionSpec); + addFakeListener(); + curatorFramework.start(); + } + + this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec; + this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length; } - protected Curator() { - this.connectionSpec = ""; - this.serverCount = 0; - retryPolicy = new ExponentialBackoffRetry(baseSleepTime, maxRetries); - curatorFramework = null; + static String createConnectionSpec(ConfigserverConfig config) { + StringBuilder connectionSpec = new StringBuilder(); + for (int i = 0; i < config.zookeeperserver().size(); i++) { + if (connectionSpec.length() > 0) { + connectionSpec.append(','); + } + ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); + connectionSpec.append(server.hostname()); + connectionSpec.append(':'); + connectionSpec.append(server.port()); + } + return connectionSpec.toString(); } private static void validateConnectionSpec(String connectionSpec) { @@ -125,18 +133,19 @@ public class Curator implements AutoCloseable { throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec)); } - /** Returns the number of zooKeeper servers in this cluster */ - public int serverCount() { return serverCount; } - - /** - * Returns the servers in this cluster as a comma-separated list of host:port strings. + /** + * Returns the ZooKeeper "connect string" used by curator: a comma-separated list of + * host:port of ZooKeeper endpoints to connect to. This may be a subset of + * zooKeeperEnsembleConnectionSpec() if there's some affinity, e.g. for + * performance reasons. + * * This may be empty but never null */ public String connectionSpec() { return connectionSpec; } /** For internal use; prefer creating a {@link CuratorCounter} */ public DistributedAtomicLong createAtomicCounter(String path) { - return new DistributedAtomicLong(curatorFramework, path, new ExponentialBackoffRetry(baseSleepTime, maxRetries)); + return new DistributedAtomicLong(curatorFramework, path, new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES)); } /** For internal use; prefer creating a {@link com.yahoo.vespa.curator.recipes.CuratorLock} */ @@ -339,4 +348,19 @@ public class Curator implements AutoCloseable { } + /** + * @return The non-null connect string containing all ZooKeeper servers in the ensemble. + * WARNING: This may be different from the servers this Curator may connect to. + * TODO: Move method out of this class. + */ + public String zooKeeperEnsembleConnectionSpec() { + return zooKeeperEnsembleConnectionSpec; + } + + /** + * Returns the number of zooKeeper servers in this ensemble. + * WARNING: This may be different from the number of servers this Curator may connect to. + * TODO: Move method out of this class. + */ + public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; } } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java index ccd02e5c6d6..4013cf1d649 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java @@ -7,8 +7,6 @@ import com.yahoo.collections.Pair; import com.yahoo.concurrent.Lock; import com.yahoo.concurrent.Locks; import com.yahoo.path.Path; -import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node; - import com.yahoo.vespa.curator.CompletionTimeoutException; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.recipes.CuratorLockException; @@ -86,6 +84,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node; + /** * <p>A <b>non thread safe</b> mock of the curator API. * The methods are implemented lazily, due to laziness. @@ -106,7 +106,7 @@ public class MockCurator extends Curator { private boolean shouldTimeoutOnEnter = false; private int monotonicallyIncreasingNumber = 0; private final boolean stableOrdering; - private String connectionSpec = ""; + private String zooKeeperEnsembleConnectionSpec = ""; private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS); /** The file system used by this mock to store zookeeper files and directories */ @@ -133,6 +133,7 @@ public class MockCurator extends Curator { * This is not what ZooKeeper does. */ public MockCurator(boolean stableOrdering) { + super("", "", (retryPolicy) -> null); this.stableOrdering = stableOrdering; curatorFramework = new MockCuratorFramework(); curatorFramework.start(); @@ -152,11 +153,18 @@ public class MockCurator extends Curator { return Optional.ofNullable(atomicCounters.get(path)); } - /** Assigns the connection string, which must be on the form host1:port,host2:port ... */ - public void setConnectionSpec(String connectionSpec) { this.connectionSpec = connectionSpec; } + /** + * Sets the ZooKeeper ensemble connection spec, which must be on the form + * host1:port,host2:port ... + */ + public void setZooKeeperEnsembleConnectionSpec(String ensembleSpec) { + this.zooKeeperEnsembleConnectionSpec = ensembleSpec; + } @Override - public String connectionSpec() { return connectionSpec; } + public String zooKeeperEnsembleConnectionSpec() { + return zooKeeperEnsembleConnectionSpec; + } // ----- Start of adaptor methods from Curator to the mock file system ----- @@ -368,7 +376,7 @@ public class MockCurator extends Curator { public void notify(Path path, PathChildrenCacheEvent event) { try { // Snapshot directoryListeners in case notification leads to new directoryListeners added - Set<Map.Entry<Path, PathChildrenCacheListener>>directoryLlistenerSnapshot = new HashSet<>(directoryListeners.entrySet()); + Set<Map.Entry<Path, PathChildrenCacheListener>> directoryLlistenerSnapshot = new HashSet<>(directoryListeners.entrySet()); for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryLlistenerSnapshot) { if (path.isChildOf(listener.getKey())) listener.getValue().childEvent(curatorFramework, event); diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java index 1899dcfe7cd..2fc4c2a7fc4 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.curator; import com.yahoo.cloud.config.ConfigserverConfig; -import com.yahoo.net.HostName; import org.apache.curator.test.TestingServer; import org.junit.After; import org.junit.Before; @@ -60,7 +59,7 @@ public class CuratorTest { public void require_that_curator_can_produce_spec() { try (Curator curator = createCurator(createTestConfig())) { assertThat(curator.connectionSpec(), is(spec1 + "," + spec2)); - assertThat(curator.serverCount(), is(2)); + assertThat(curator.zooKeeperEnsembleCount(), is(2)); } } @@ -69,27 +68,10 @@ public class CuratorTest { ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); builder.zookeeperserver(createZKBuilder("localhost", port1)); try (Curator curator = createCurator(new ConfigserverConfig(builder))) { - assertThat(curator.serverCount(), is(1)); + assertThat(curator.zooKeeperEnsembleCount(), is(1)); } } - @Test - public void localhost_affinity() { - String localhostHostName = "myhost"; - int localhostPort = 123; - String localhostSpec = localhostHostName + ":" + localhostPort; - - ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); - builder.zookeeperserver(createZKBuilder(localhostHostName, localhostPort)); - builder.zookeeperserver(createZKBuilder("otherhost", 345)); - builder.zookeeperLocalhostAffinity(true); - ConfigserverConfig config = new ConfigserverConfig(builder); - - HostName.setHostNameForTestingOnly(localhostHostName); - - assertThat(Curator.createConnectionSpec(config), is(localhostSpec)); - } - private ConfigserverConfig createTestConfig() { ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder(); builder.zookeeperserver(createZKBuilder("localhost", port1)); |