diff options
Diffstat (limited to 'zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java')
-rw-r--r-- | zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java | 142 |
1 files changed, 49 insertions, 93 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 04bd64219d4..90eec5760fc 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -3,10 +3,11 @@ package com.yahoo.vespa.curator; import com.google.inject.Inject; import com.yahoo.cloud.config.ConfigserverConfig; +import com.yahoo.cloud.config.CuratorConfig; import com.yahoo.io.IOUtils; -import com.yahoo.net.HostName; import com.yahoo.path.Path; import com.yahoo.text.Utf8; +import com.yahoo.vespa.curator.api.VespaCurator; import com.yahoo.vespa.curator.recipes.CuratorCounter; import com.yahoo.vespa.defaults.Defaults; import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; @@ -31,6 +32,7 @@ import java.io.File; import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -47,7 +49,7 @@ import java.util.logging.Logger; * @author vegardh * @author bratseth */ -public class Curator implements AutoCloseable { +public class Curator implements VespaCurator, AutoCloseable { private static final Logger LOG = Logger.getLogger(Curator.class.getName()); private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg")); @@ -55,81 +57,67 @@ public class Curator implements AutoCloseable { private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30); private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1); private static final int MAX_RETRIES = 10; + private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES); - protected final RetryPolicy retryPolicy; + protected final RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY; private final CuratorFramework curatorFramework; - private final String connectionSpec; // May be a subset of the servers in the ensemble - private final String zooKeeperEnsembleConnectionSpec; - private final int zooKeeperEnsembleCount; + private final ConnectionSpec connectionSpec; // All lock keys, to allow re-entrancy. This will grow forever, but this should be too slow to be a problem private final ConcurrentHashMap<Path, Lock> locks = new ConcurrentHashMap<>(); /** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */ public static Curator create(String connectionSpec) { - return new Curator(connectionSpec, connectionSpec, Optional.of(ZK_CLIENT_CONFIG_FILE)); + return new Curator(ConnectionSpec.create(connectionSpec), Optional.of(ZK_CLIENT_CONFIG_FILE)); } // For testing only, use Optional.empty for clientConfigFile parameter to create default zookeeper client config public static Curator create(String connectionSpec, Optional<File> clientConfigFile) { - return new Curator(connectionSpec, connectionSpec, clientConfigFile); + return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile); } - // Depend on ZooKeeperServer to make sure it is started first - // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well) @Inject - public Curator(ConfigserverConfig configserverConfig, VespaZooKeeperServer server) { - this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE)); + public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { + // Depends on ZooKeeperServer to make sure it is started first + this(ConnectionSpec.create(curatorConfig.server(), + CuratorConfig.Server::hostname, + CuratorConfig.Server::port, + curatorConfig.zookeeperLocalhostAffinity()), + Optional.of(ZK_CLIENT_CONFIG_FILE)); } - Curator(ConfigserverConfig configserverConfig, Optional<File> clientConfigFile) { - this(createConnectionSpec(configserverConfig), createEnsembleConnectionSpec(configserverConfig), clientConfigFile); + // TODO: This can be removed when this package is no longer public API. + public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) { + this(ConnectionSpec.create(configserverConfig.zookeeperserver(), + ConfigserverConfig.Zookeeperserver::hostname, + ConfigserverConfig.Zookeeperserver::port, + configserverConfig.zookeeperLocalhostAffinity()), + Optional.of(ZK_CLIENT_CONFIG_FILE)); } - private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Optional<File> clientConfigFile) { - this(connectionSpec, - zooKeeperEnsembleConnectionSpec, - (retryPolicy) -> CuratorFrameworkFactory - .builder() - .retryPolicy(retryPolicy) - .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) - .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) - .connectString(connectionSpec) - .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) - .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 - .build()); - } - - protected Curator(String connectionSpec, - String zooKeeperEnsembleConnectionSpec, - Function<RetryPolicy, CuratorFramework> curatorFactory) { - this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory, - new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES)); + protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function<RetryPolicy, CuratorFramework> curatorFactory) { + this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory.apply(DEFAULT_RETRY_POLICY)); } - private Curator(String connectionSpec, - String zooKeeperEnsembleConnectionSpec, - Function<RetryPolicy, CuratorFramework> curatorFactory, - RetryPolicy retryPolicy) { - this.connectionSpec = connectionSpec; - this.retryPolicy = retryPolicy; - this.curatorFramework = curatorFactory.apply(retryPolicy); - if (this.curatorFramework != null) { - validateConnectionSpec(connectionSpec); - validateConnectionSpec(zooKeeperEnsembleConnectionSpec); - addLoggingListener(); - curatorFramework.start(); - } - - this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec; - this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length; + Curator(ConnectionSpec connectionSpec, Optional<File> clientConfigFile) { + this(connectionSpec, + CuratorFrameworkFactory + .builder() + .retryPolicy(DEFAULT_RETRY_POLICY) + .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis()) + .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis()) + .connectString(connectionSpec.local()) + .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile))) + .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8 + .build()); } - private static String createConnectionSpec(ConfigserverConfig configserverConfig) { - return configserverConfig.zookeeperLocalhostAffinity() - ? createConnectionSpecForLocalhost(configserverConfig) - : createEnsembleConnectionSpec(configserverConfig); + private Curator(ConnectionSpec connectionSpec, CuratorFramework curatorFramework) { + this.connectionSpec = Objects.requireNonNull(connectionSpec); + this.curatorFramework = Objects.requireNonNull(curatorFramework); + addLoggingListener(); + curatorFramework.start(); } private static ZKClientConfig createClientConfig(Optional<File> clientConfigFile) { @@ -148,39 +136,6 @@ public class Curator implements AutoCloseable { } } - private static String createEnsembleConnectionSpec(ConfigserverConfig config) { - StringBuilder connectionSpec = new StringBuilder(); - for (int i = 0; i < config.zookeeperserver().size(); i++) { - if (connectionSpec.length() > 0) { - connectionSpec.append(','); - } - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - connectionSpec.append(server.hostname()); - connectionSpec.append(':'); - connectionSpec.append(server.port()); - } - return connectionSpec.toString(); - } - - static String createConnectionSpecForLocalhost(ConfigserverConfig config) { - String thisServer = HostName.getLocalhost(); - - for (int i = 0; i < config.zookeeperserver().size(); i++) { - ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i); - if (thisServer.equals(server.hostname())) { - return String.format("%s:%d", server.hostname(), server.port()); - } - } - - throw new IllegalArgumentException("Unable to create connect string to localhost: " + - "There is no localhost server specified in config: " + config); - } - - private static void validateConnectionSpec(String connectionSpec) { - if (connectionSpec == null || connectionSpec.isEmpty()) - throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec)); - } - /** * Returns the ZooKeeper "connect string" used by curator: a comma-separated list of * host:port of ZooKeeper endpoints to connect to. This may be a subset of @@ -189,7 +144,7 @@ public class Curator implements AutoCloseable { * * This may be empty but never null */ - public String connectionSpec() { return connectionSpec; } + public String connectionSpec() { return connectionSpec.local(); } /** For internal use; prefer creating a {@link CuratorCounter} */ public DistributedAtomicLong createAtomicCounter(String path) { @@ -243,13 +198,14 @@ public class Curator implements AutoCloseable { * A convenience method which sets some content at a path. * If the path and any of its parents does not exists they are created. */ + // TODO: Use create().orSetData() in Curator 4 and later public void set(Path path, byte[] data) { + if ( ! exists(path)) + create(path); + String absolutePath = path.getAbsolute(); try { - if ( ! exists(path)) - framework().create().creatingParentsIfNeeded().forPath(absolutePath, data); - else - framework().setData().forPath(absolutePath, data); + framework().setData().forPath(absolutePath, data); } catch (Exception e) { throw new RuntimeException("Could not set data at " + absolutePath, e); } @@ -432,7 +388,7 @@ public class Curator implements AutoCloseable { * TODO: Move method out of this class. */ public String zooKeeperEnsembleConnectionSpec() { - return zooKeeperEnsembleConnectionSpec; + return connectionSpec.ensemble(); } /** @@ -440,7 +396,7 @@ public class Curator implements AutoCloseable { * WARNING: This may be different from the number of servers this Curator may connect to. * TODO: Move method out of this class. */ - public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; } + public int zooKeeperEnsembleCount() { return connectionSpec.ensembleSize(); } private static Optional<String> getEnvironmentVariable(String variableName) { return Optional.ofNullable(System.getenv().get(variableName)) |