summaryrefslogtreecommitdiffstats
path: root/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
diff options
context:
space:
mode:
Diffstat (limited to 'zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java')
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java142
1 files changed, 49 insertions, 93 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 04bd64219d4..90eec5760fc 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -3,10 +3,11 @@ package com.yahoo.vespa.curator;
import com.google.inject.Inject;
import com.yahoo.cloud.config.ConfigserverConfig;
+import com.yahoo.cloud.config.CuratorConfig;
import com.yahoo.io.IOUtils;
-import com.yahoo.net.HostName;
import com.yahoo.path.Path;
import com.yahoo.text.Utf8;
+import com.yahoo.vespa.curator.api.VespaCurator;
import com.yahoo.vespa.curator.recipes.CuratorCounter;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
@@ -31,6 +32,7 @@ import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -47,7 +49,7 @@ import java.util.logging.Logger;
* @author vegardh
* @author bratseth
*/
-public class Curator implements AutoCloseable {
+public class Curator implements VespaCurator, AutoCloseable {
private static final Logger LOG = Logger.getLogger(Curator.class.getName());
private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg"));
@@ -55,81 +57,67 @@ public class Curator implements AutoCloseable {
private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1);
private static final int MAX_RETRIES = 10;
+ private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES);
- protected final RetryPolicy retryPolicy;
+ protected final RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY;
private final CuratorFramework curatorFramework;
- private final String connectionSpec; // May be a subset of the servers in the ensemble
- private final String zooKeeperEnsembleConnectionSpec;
- private final int zooKeeperEnsembleCount;
+ private final ConnectionSpec connectionSpec;
// All lock keys, to allow re-entrancy. This will grow forever, but this should be too slow to be a problem
private final ConcurrentHashMap<Path, Lock> locks = new ConcurrentHashMap<>();
/** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */
public static Curator create(String connectionSpec) {
- return new Curator(connectionSpec, connectionSpec, Optional.of(ZK_CLIENT_CONFIG_FILE));
+ return new Curator(ConnectionSpec.create(connectionSpec), Optional.of(ZK_CLIENT_CONFIG_FILE));
}
// For testing only, use Optional.empty for clientConfigFile parameter to create default zookeeper client config
public static Curator create(String connectionSpec, Optional<File> clientConfigFile) {
- return new Curator(connectionSpec, connectionSpec, clientConfigFile);
+ return new Curator(ConnectionSpec.create(connectionSpec), clientConfigFile);
}
- // Depend on ZooKeeperServer to make sure it is started first
- // TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well)
@Inject
- public Curator(ConfigserverConfig configserverConfig, VespaZooKeeperServer server) {
- this(configserverConfig, Optional.of(ZK_CLIENT_CONFIG_FILE));
+ public Curator(CuratorConfig curatorConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) {
+ // Depends on ZooKeeperServer to make sure it is started first
+ this(ConnectionSpec.create(curatorConfig.server(),
+ CuratorConfig.Server::hostname,
+ CuratorConfig.Server::port,
+ curatorConfig.zookeeperLocalhostAffinity()),
+ Optional.of(ZK_CLIENT_CONFIG_FILE));
}
- Curator(ConfigserverConfig configserverConfig, Optional<File> clientConfigFile) {
- this(createConnectionSpec(configserverConfig), createEnsembleConnectionSpec(configserverConfig), clientConfigFile);
+ // TODO: This can be removed when this package is no longer public API.
+ public Curator(ConfigserverConfig configserverConfig, @SuppressWarnings("unused") VespaZooKeeperServer server) {
+ this(ConnectionSpec.create(configserverConfig.zookeeperserver(),
+ ConfigserverConfig.Zookeeperserver::hostname,
+ ConfigserverConfig.Zookeeperserver::port,
+ configserverConfig.zookeeperLocalhostAffinity()),
+ Optional.of(ZK_CLIENT_CONFIG_FILE));
}
- private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Optional<File> clientConfigFile) {
- this(connectionSpec,
- zooKeeperEnsembleConnectionSpec,
- (retryPolicy) -> CuratorFrameworkFactory
- .builder()
- .retryPolicy(retryPolicy)
- .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis())
- .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis())
- .connectString(connectionSpec)
- .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile)))
- .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8
- .build());
- }
-
- protected Curator(String connectionSpec,
- String zooKeeperEnsembleConnectionSpec,
- Function<RetryPolicy, CuratorFramework> curatorFactory) {
- this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory,
- new ExponentialBackoffRetry((int) BASE_SLEEP_TIME.toMillis(), MAX_RETRIES));
+ protected Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec, Function<RetryPolicy, CuratorFramework> curatorFactory) {
+ this(ConnectionSpec.create(connectionSpec, zooKeeperEnsembleConnectionSpec), curatorFactory.apply(DEFAULT_RETRY_POLICY));
}
- private Curator(String connectionSpec,
- String zooKeeperEnsembleConnectionSpec,
- Function<RetryPolicy, CuratorFramework> curatorFactory,
- RetryPolicy retryPolicy) {
- this.connectionSpec = connectionSpec;
- this.retryPolicy = retryPolicy;
- this.curatorFramework = curatorFactory.apply(retryPolicy);
- if (this.curatorFramework != null) {
- validateConnectionSpec(connectionSpec);
- validateConnectionSpec(zooKeeperEnsembleConnectionSpec);
- addLoggingListener();
- curatorFramework.start();
- }
-
- this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec;
- this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length;
+ Curator(ConnectionSpec connectionSpec, Optional<File> clientConfigFile) {
+ this(connectionSpec,
+ CuratorFrameworkFactory
+ .builder()
+ .retryPolicy(DEFAULT_RETRY_POLICY)
+ .sessionTimeoutMs((int) ZK_SESSION_TIMEOUT.toMillis())
+ .connectionTimeoutMs((int) ZK_CONNECTION_TIMEOUT.toMillis())
+ .connectString(connectionSpec.local())
+ .zookeeperFactory(new VespaZooKeeperFactory(createClientConfig(clientConfigFile)))
+ .dontUseContainerParents() // TODO: Remove when we know ZooKeeper 3.5 works fine, consider waiting until Vespa 8
+ .build());
}
- private static String createConnectionSpec(ConfigserverConfig configserverConfig) {
- return configserverConfig.zookeeperLocalhostAffinity()
- ? createConnectionSpecForLocalhost(configserverConfig)
- : createEnsembleConnectionSpec(configserverConfig);
+ private Curator(ConnectionSpec connectionSpec, CuratorFramework curatorFramework) {
+ this.connectionSpec = Objects.requireNonNull(connectionSpec);
+ this.curatorFramework = Objects.requireNonNull(curatorFramework);
+ addLoggingListener();
+ curatorFramework.start();
}
private static ZKClientConfig createClientConfig(Optional<File> clientConfigFile) {
@@ -148,39 +136,6 @@ public class Curator implements AutoCloseable {
}
}
- private static String createEnsembleConnectionSpec(ConfigserverConfig config) {
- StringBuilder connectionSpec = new StringBuilder();
- for (int i = 0; i < config.zookeeperserver().size(); i++) {
- if (connectionSpec.length() > 0) {
- connectionSpec.append(',');
- }
- ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
- connectionSpec.append(server.hostname());
- connectionSpec.append(':');
- connectionSpec.append(server.port());
- }
- return connectionSpec.toString();
- }
-
- static String createConnectionSpecForLocalhost(ConfigserverConfig config) {
- String thisServer = HostName.getLocalhost();
-
- for (int i = 0; i < config.zookeeperserver().size(); i++) {
- ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
- if (thisServer.equals(server.hostname())) {
- return String.format("%s:%d", server.hostname(), server.port());
- }
- }
-
- throw new IllegalArgumentException("Unable to create connect string to localhost: " +
- "There is no localhost server specified in config: " + config);
- }
-
- private static void validateConnectionSpec(String connectionSpec) {
- if (connectionSpec == null || connectionSpec.isEmpty())
- throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec));
- }
-
/**
* Returns the ZooKeeper "connect string" used by curator: a comma-separated list of
* host:port of ZooKeeper endpoints to connect to. This may be a subset of
@@ -189,7 +144,7 @@ public class Curator implements AutoCloseable {
*
* This may be empty but never null
*/
- public String connectionSpec() { return connectionSpec; }
+ public String connectionSpec() { return connectionSpec.local(); }
/** For internal use; prefer creating a {@link CuratorCounter} */
public DistributedAtomicLong createAtomicCounter(String path) {
@@ -243,13 +198,14 @@ public class Curator implements AutoCloseable {
* A convenience method which sets some content at a path.
* If the path and any of its parents does not exists they are created.
*/
+ // TODO: Use create().orSetData() in Curator 4 and later
public void set(Path path, byte[] data) {
+ if ( ! exists(path))
+ create(path);
+
String absolutePath = path.getAbsolute();
try {
- if ( ! exists(path))
- framework().create().creatingParentsIfNeeded().forPath(absolutePath, data);
- else
- framework().setData().forPath(absolutePath, data);
+ framework().setData().forPath(absolutePath, data);
} catch (Exception e) {
throw new RuntimeException("Could not set data at " + absolutePath, e);
}
@@ -432,7 +388,7 @@ public class Curator implements AutoCloseable {
* TODO: Move method out of this class.
*/
public String zooKeeperEnsembleConnectionSpec() {
- return zooKeeperEnsembleConnectionSpec;
+ return connectionSpec.ensemble();
}
/**
@@ -440,7 +396,7 @@ public class Curator implements AutoCloseable {
* WARNING: This may be different from the number of servers this Curator may connect to.
* TODO: Move method out of this class.
*/
- public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; }
+ public int zooKeeperEnsembleCount() { return connectionSpec.ensembleSize(); }
private static Optional<String> getEnvironmentVariable(String variableName) {
return Optional.ofNullable(System.getenv().get(variableName))