summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorHåkon Hallingstad <hakon@oath.com>2018-01-11 23:11:56 +0100
committerHåkon Hallingstad <hakon@oath.com>2018-01-11 23:11:56 +0100
commita6442d127d3bd311542842cdee2ee6dbba7b3629 (patch)
tree22387a253c0848efda52f3db7f00b4ed4f5d110a /zkfacade
parentbd891f6ff6e029dca4083ca9205df8f36bda09ed (diff)
Some Curator clients require ensemble connect string
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/pom.xml1
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java138
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java22
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java22
4 files changed, 98 insertions, 85 deletions
diff --git a/zkfacade/pom.xml b/zkfacade/pom.xml
index f9cb8c42688..0aa689d896d 100644
--- a/zkfacade/pom.xml
+++ b/zkfacade/pom.xml
@@ -77,7 +77,6 @@
<arg>-Xlint:all</arg>
<arg>-Xlint:-serial</arg>
<arg>-Xlint:-try</arg>
- <arg>-Werror</arg>
</compilerArgs>
</configuration>
</plugin>
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
index 4c932969460..5ba16232221 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java
@@ -26,10 +26,10 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
/**
* Curator interface for Vespa.
@@ -47,77 +47,85 @@ public class Curator implements AutoCloseable {
private static final int ZK_SESSION_TIMEOUT = 30000;
private static final int ZK_CONNECTION_TIMEOUT = 30000;
- private static final int baseSleepTime = 1000; //ms
- private static final int maxRetries = 10;
+ private static final int BASE_SLEEP_TIME = 1000; //ms
+ private static final int MAX_RETRIES = 10;
- private final CuratorFramework curatorFramework;
protected final RetryPolicy retryPolicy;
- private final String connectionSpec;
- private final int serverCount;
+ private final CuratorFramework curatorFramework;
+ private final String connectionSpec; // May be a subset of the servers in the ensemble
+
+ private final String zooKeeperEnsembleConnectionSpec;
+ private final int zooKeeperEnsembleCount;
/** Creates a curator instance from a comma-separated string of ZooKeeper host:port strings */
public static Curator create(String connectionSpec) {
- return new Curator(connectionSpec);
+ return new Curator(connectionSpec, connectionSpec);
}
// Depend on ZooKeeperServer to make sure it is started first
// TODO: Move zookeeperserver config out of configserverconfig (requires update of controller services.xml as well)
@Inject
public Curator(ConfigserverConfig configserverConfig, ZooKeeperServer server) {
- this(createConnectionSpec(configserverConfig));
+ this(configserverConfig, createConnectionSpec(configserverConfig));
}
-
- static String createConnectionSpec(ConfigserverConfig config) {
- String thisServer = HostName.getLocalhost();
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < config.zookeeperserver().size(); i++) {
- ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
-
- String spec = String.format("%s:%d", server.hostname(), server.port());
- if (config.zookeeperLocalhostAffinity() && server.hostname().equals(thisServer)) {
- // Only connect to localhost server if possible, to save network traffic
- // and balance load.
- return spec;
- }
+ private Curator(ConfigserverConfig configserverConfig, String zooKeeperEnsembleConnectionSpec) {
+ this(configserverConfig.zookeeperLocalhostAffinity() ?
+ HostName.getLocalhost() : zooKeeperEnsembleConnectionSpec,
+ zooKeeperEnsembleConnectionSpec);
+ }
- if (sb.length() > 0) {
- sb.append(',');
- }
+ private Curator(String connectionSpec, String zooKeeperEnsembleConnectionSpec) {
+ this(connectionSpec,
+ zooKeeperEnsembleConnectionSpec,
+ (retryPolicy) -> CuratorFrameworkFactory
+ .builder()
+ .retryPolicy(retryPolicy)
+ .sessionTimeoutMs(ZK_SESSION_TIMEOUT)
+ .connectionTimeoutMs(ZK_CONNECTION_TIMEOUT)
+ .connectString(connectionSpec)
+ .zookeeperFactory(new DNSResolvingFixerZooKeeperFactory(UNKNOWN_HOST_TIMEOUT_MILLIS))
+ .build());
+ }
- sb.append(spec);
- }
- return sb.toString();
+ protected Curator(String connectionSpec,
+ String zooKeeperEnsembleConnectionSpec,
+ Function<RetryPolicy, CuratorFramework> curatorFactory) {
+ this(connectionSpec, zooKeeperEnsembleConnectionSpec, curatorFactory,
+ new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES));
}
- /**
- * Create a curator instance which connects to the zookeeper servers given by a connection spec
- * on the format "hostname1:port,hostname2:port" ...
- */
- public Curator(String connectionSpec) {
- Objects.requireNonNull(connectionSpec, "The curator connection spec cannot be null");
+ private Curator(String connectionSpec,
+ String zooKeeperEnsembleConnectionSpec,
+ Function<RetryPolicy, CuratorFramework> curatorFactory,
+ RetryPolicy retryPolicy) {
this.connectionSpec = connectionSpec;
- this.serverCount = connectionSpec.split(",").length;
- validateConnectionSpec(connectionSpec);
- retryPolicy = new ExponentialBackoffRetry(baseSleepTime, maxRetries);
- curatorFramework = CuratorFrameworkFactory.builder()
- .retryPolicy(retryPolicy)
- .sessionTimeoutMs(ZK_SESSION_TIMEOUT)
- .connectionTimeoutMs(ZK_CONNECTION_TIMEOUT)
- .connectString(connectionSpec)
- .zookeeperFactory(new DNSResolvingFixerZooKeeperFactory(UNKNOWN_HOST_TIMEOUT_MILLIS))
- .build();
- addFakeListener();
- curatorFramework.start();
+ this.retryPolicy = retryPolicy;
+ this.curatorFramework = curatorFactory.apply(retryPolicy);
+ if (this.curatorFramework != null) {
+ validateConnectionSpec(connectionSpec);
+ validateConnectionSpec(zooKeeperEnsembleConnectionSpec);
+ addFakeListener();
+ curatorFramework.start();
+ }
+
+ this.zooKeeperEnsembleConnectionSpec = zooKeeperEnsembleConnectionSpec;
+ this.zooKeeperEnsembleCount = zooKeeperEnsembleConnectionSpec.split(",").length;
}
- protected Curator() {
- this.connectionSpec = "";
- this.serverCount = 0;
- retryPolicy = new ExponentialBackoffRetry(baseSleepTime, maxRetries);
- curatorFramework = null;
+ static String createConnectionSpec(ConfigserverConfig config) {
+ StringBuilder connectionSpec = new StringBuilder();
+ for (int i = 0; i < config.zookeeperserver().size(); i++) {
+ if (connectionSpec.length() > 0) {
+ connectionSpec.append(',');
+ }
+ ConfigserverConfig.Zookeeperserver server = config.zookeeperserver(i);
+ connectionSpec.append(server.hostname());
+ connectionSpec.append(':');
+ connectionSpec.append(server.port());
+ }
+ return connectionSpec.toString();
}
private static void validateConnectionSpec(String connectionSpec) {
@@ -125,18 +133,19 @@ public class Curator implements AutoCloseable {
throw new IllegalArgumentException(String.format("Connections spec '%s' is not valid", connectionSpec));
}
- /** Returns the number of zooKeeper servers in this cluster */
- public int serverCount() { return serverCount; }
-
- /**
- * Returns the servers in this cluster as a comma-separated list of host:port strings.
+ /**
+ * Returns the ZooKeeper "connect string" used by curator: a comma-separated list of
+ * host:port of ZooKeeper endpoints to connect to. This may be a subset of
+ * zooKeeperEnsembleConnectionSpec() if there's some affinity, e.g. for
+ * performance reasons.
+ *
* This may be empty but never null
*/
public String connectionSpec() { return connectionSpec; }
/** For internal use; prefer creating a {@link CuratorCounter} */
public DistributedAtomicLong createAtomicCounter(String path) {
- return new DistributedAtomicLong(curatorFramework, path, new ExponentialBackoffRetry(baseSleepTime, maxRetries));
+ return new DistributedAtomicLong(curatorFramework, path, new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES));
}
/** For internal use; prefer creating a {@link com.yahoo.vespa.curator.recipes.CuratorLock} */
@@ -339,4 +348,19 @@ public class Curator implements AutoCloseable {
}
+ /**
+ * @return The non-null connect string containing all ZooKeeper servers in the ensemble.
+ * WARNING: This may be different from the servers this Curator may connect to.
+ * TODO: Move method out of this class.
+ */
+ public String zooKeeperEnsembleConnectionSpec() {
+ return zooKeeperEnsembleConnectionSpec;
+ }
+
+ /**
+ * Returns the number of zooKeeper servers in this ensemble.
+ * WARNING: This may be different from the number of servers this Curator may connect to.
+ * TODO: Move method out of this class.
+ */
+ public int zooKeeperEnsembleCount() { return zooKeeperEnsembleCount; }
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
index ccd02e5c6d6..4013cf1d649 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/mock/MockCurator.java
@@ -7,8 +7,6 @@ import com.yahoo.collections.Pair;
import com.yahoo.concurrent.Lock;
import com.yahoo.concurrent.Locks;
import com.yahoo.path.Path;
-import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
-
import com.yahoo.vespa.curator.CompletionTimeoutException;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.curator.recipes.CuratorLockException;
@@ -86,6 +84,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static com.yahoo.vespa.curator.mock.MemoryFileSystem.Node;
+
/**
* <p>A <b>non thread safe</b> mock of the curator API.
* The methods are implemented lazily, due to laziness.
@@ -106,7 +106,7 @@ public class MockCurator extends Curator {
private boolean shouldTimeoutOnEnter = false;
private int monotonicallyIncreasingNumber = 0;
private final boolean stableOrdering;
- private String connectionSpec = "";
+ private String zooKeeperEnsembleConnectionSpec = "";
private final Locks<String> locks = new Locks<>(Long.MAX_VALUE, TimeUnit.DAYS);
/** The file system used by this mock to store zookeeper files and directories */
@@ -133,6 +133,7 @@ public class MockCurator extends Curator {
* This is not what ZooKeeper does.
*/
public MockCurator(boolean stableOrdering) {
+ super("", "", (retryPolicy) -> null);
this.stableOrdering = stableOrdering;
curatorFramework = new MockCuratorFramework();
curatorFramework.start();
@@ -152,11 +153,18 @@ public class MockCurator extends Curator {
return Optional.ofNullable(atomicCounters.get(path));
}
- /** Assigns the connection string, which must be on the form host1:port,host2:port ... */
- public void setConnectionSpec(String connectionSpec) { this.connectionSpec = connectionSpec; }
+ /**
+ * Sets the ZooKeeper ensemble connection spec, which must be on the form
+ * host1:port,host2:port ...
+ */
+ public void setZooKeeperEnsembleConnectionSpec(String ensembleSpec) {
+ this.zooKeeperEnsembleConnectionSpec = ensembleSpec;
+ }
@Override
- public String connectionSpec() { return connectionSpec; }
+ public String zooKeeperEnsembleConnectionSpec() {
+ return zooKeeperEnsembleConnectionSpec;
+ }
// ----- Start of adaptor methods from Curator to the mock file system -----
@@ -368,7 +376,7 @@ public class MockCurator extends Curator {
public void notify(Path path, PathChildrenCacheEvent event) {
try {
// Snapshot directoryListeners in case notification leads to new directoryListeners added
- Set<Map.Entry<Path, PathChildrenCacheListener>>directoryLlistenerSnapshot = new HashSet<>(directoryListeners.entrySet());
+ Set<Map.Entry<Path, PathChildrenCacheListener>> directoryLlistenerSnapshot = new HashSet<>(directoryListeners.entrySet());
for (Map.Entry<Path, PathChildrenCacheListener> listener : directoryLlistenerSnapshot) {
if (path.isChildOf(listener.getKey()))
listener.getValue().childEvent(curatorFramework, event);
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
index 1899dcfe7cd..2fc4c2a7fc4 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorTest.java
@@ -2,7 +2,6 @@
package com.yahoo.vespa.curator;
import com.yahoo.cloud.config.ConfigserverConfig;
-import com.yahoo.net.HostName;
import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
@@ -60,7 +59,7 @@ public class CuratorTest {
public void require_that_curator_can_produce_spec() {
try (Curator curator = createCurator(createTestConfig())) {
assertThat(curator.connectionSpec(), is(spec1 + "," + spec2));
- assertThat(curator.serverCount(), is(2));
+ assertThat(curator.zooKeeperEnsembleCount(), is(2));
}
}
@@ -69,27 +68,10 @@ public class CuratorTest {
ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
builder.zookeeperserver(createZKBuilder("localhost", port1));
try (Curator curator = createCurator(new ConfigserverConfig(builder))) {
- assertThat(curator.serverCount(), is(1));
+ assertThat(curator.zooKeeperEnsembleCount(), is(1));
}
}
- @Test
- public void localhost_affinity() {
- String localhostHostName = "myhost";
- int localhostPort = 123;
- String localhostSpec = localhostHostName + ":" + localhostPort;
-
- ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
- builder.zookeeperserver(createZKBuilder(localhostHostName, localhostPort));
- builder.zookeeperserver(createZKBuilder("otherhost", 345));
- builder.zookeeperLocalhostAffinity(true);
- ConfigserverConfig config = new ConfigserverConfig(builder);
-
- HostName.setHostNameForTestingOnly(localhostHostName);
-
- assertThat(Curator.createConnectionSpec(config), is(localhostSpec));
- }
-
private ConfigserverConfig createTestConfig() {
ConfigserverConfig.Builder builder = new ConfigserverConfig.Builder();
builder.zookeeperserver(createZKBuilder("localhost", port1));