summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHÃ¥kon Hallingstad <hakon@yahooinc.com>2022-01-14 19:41:35 +0100
committerGitHub <noreply@github.com>2022-01-14 19:41:35 +0100
commit1d84bb26b3d7d59987c18bf3c3a49203e749610c (patch)
treefb1d6bae6b3a0f947485088337ed3a0859c3c447
parent85ab960e1969501577296edbb150510fb0c2c645 (diff)
parent810da357155a349884b862de87e18de87ed54b35 (diff)
Merge pull request #20819 from vespa-engine/revert-20801-jonmv/reapply-zk-changes-2
Revert "Jonmv/reapply zk changes 2"
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java6
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java5
-rw-r--r--config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java10
-rw-r--r--configdefinitions/src/vespa/zookeeper-server.def3
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/pom.xml9
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java8
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java45
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java258
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/pom.xml9
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java8
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java47
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java258
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java11
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java66
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java2
-rw-r--r--zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java69
16 files changed, 151 insertions, 663 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java
index 0b99496a9b4..6e6f027b520 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java
@@ -50,12 +50,14 @@ public class ClusterControllerCluster extends AbstractConfigProducer<ClusterCont
public void getConfig(ZookeeperServerConfig.Builder builder) {
builder.clientPort(ZK_CLIENT_PORT);
builder.juteMaxBuffer(1024 * 1024); // 1 Mb should be more than enough for cluster controller
+ boolean oldQuorumExists = containerCluster.getContainers().stream() // More than half the previous hosts must be present in the new config for quorum to persist.
+ .filter(container -> previousHosts.contains(container.getHostName())) // Set intersection is symmetric.
+ .count() > previousHosts.size() / 2;
for (ClusterControllerContainer container : containerCluster.getContainers()) {
ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder();
serverBuilder.hostname(container.getHostName());
serverBuilder.id(container.index());
- serverBuilder.joining( ! previousHosts.isEmpty() && ! previousHosts.contains(container.getHostName()));
- serverBuilder.retired(container.isRetired());
+ serverBuilder.joining(oldQuorumExists && ! previousHosts.contains(container.getHostName()));
builder.server(serverBuilder);
}
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
index 438e143bdfd..c4d420f2d44 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
@@ -309,9 +309,8 @@ public final class ApplicationContainerCluster extends ContainerCluster<Applicat
ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder();
serverBuilder.hostname(container.getHostName())
.id(container.index())
- .joining( ! previousHosts.isEmpty() &&
- ! previousHosts.contains(container.getHostName()))
- .retired(container.isRetired());
+ .joining(!previousHosts.isEmpty() &&
+ !previousHosts.contains(container.getHostName()));
builder.server(serverBuilder);
builder.dynamicReconfiguration(true);
}
diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
index ba70b7493a2..10f883bdc75 100644
--- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
+++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
@@ -2054,7 +2054,7 @@ public class ModelProvisioningTest {
assertTrue("Initial servers are not joining", config.build().server().stream().noneMatch(ZookeeperServerConfig.Server::joining));
}
{
- VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(3), true, false, false, 0, Optional.of(model), new DeployState.Builder(), "node-1-3-10-04", "node-1-3-10-03");
+ VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(5), true, false, false, 0, Optional.of(model), new DeployState.Builder());
ApplicationContainerCluster cluster = nextModel.getContainerClusters().get("zk");
ZookeeperServerConfig.Builder config = new ZookeeperServerConfig.Builder();
cluster.getContainers().forEach(c -> c.getConfig(config));
@@ -2067,14 +2067,6 @@ public class ModelProvisioningTest {
4, true),
config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id,
ZookeeperServerConfig.Server::joining)));
- assertEquals("Retired nodes are retired",
- Map.of(0, false,
- 1, true,
- 2, true,
- 3, false,
- 4, false),
- config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id,
- ZookeeperServerConfig.Server::retired)));
}
}
diff --git a/configdefinitions/src/vespa/zookeeper-server.def b/configdefinitions/src/vespa/zookeeper-server.def
index d80ccc4d042..04632ffd35f 100644
--- a/configdefinitions/src/vespa/zookeeper-server.def
+++ b/configdefinitions/src/vespa/zookeeper-server.def
@@ -32,13 +32,10 @@ juteMaxBuffer int default=52428800
myid int restart
server[].id int
server[].hostname string
-server[].clientPort int default=2181
server[].quorumPort int default=2182
server[].electionPort int default=2183
# Whether this server is joining an existing cluster
server[].joining bool default=false
-# Whether this server is retired, and about to be removed
-server[].retired bool default=false
# Needed when upgrading from ZooKeeper 3.4 to 3.5, see https://issues.apache.org/jira/browse/ZOOKEEPER-3056,
# and in general where there is a zookeeper ensemble running that has had few transactions.
diff --git a/zookeeper-server/zookeeper-server-3.6.3/pom.xml b/zookeeper-server/zookeeper-server-3.6.3/pom.xml
index a8ad183de4e..f7e6f512f7c 100644
--- a/zookeeper-server/zookeeper-server-3.6.3/pom.xml
+++ b/zookeeper-server/zookeeper-server-3.6.3/pom.xml
@@ -11,9 +11,6 @@
<artifactId>zookeeper-server-3.6.3</artifactId>
<packaging>container-plugin</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <zookeeper.version>3.6.3</zookeeper.version>
- </properties>
<dependencies>
<dependency>
<groupId>com.yahoo.vespa</groupId>
@@ -35,7 +32,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
+ <version>3.6.3</version>
<exclusions>
<!--
Container provides wiring for all common log libraries
@@ -90,6 +87,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
+ <arg>-Werror</arg>
</compilerArgs>
</configuration>
</plugin>
@@ -99,9 +97,6 @@
<configuration>
<redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile>
<forkMode>once</forkMode>
- <systemPropertyVariables>
- <zk-version>${zookeeper.version}</zk-version>
- </systemPropertyVariables>
</configuration>
</plugin>
<plugin>
diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
index 246911fdfc7..c002ffa72ce 100644
--- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
@@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
- private QuorumPeer peer;
+ private final AtomicReference<QuorumPeer> peer = new AtomicReference<>();
@Inject
public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) {
- peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer());
+ reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set);
}
@Override
public void shutdown() {
- peer.shutdown(Duration.ofMinutes(1));
+ peer.get().shutdown(Duration.ofMinutes(1));
}
@Override
public void start(Path configFilePath) {
- peer.start(configFilePath);
+ peer.get().start(configFilePath);
}
@Override
diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
index f0a95b70e96..27aa18c64c7 100644
--- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
+++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
@@ -2,15 +2,11 @@
package com.yahoo.vespa.zookeeper;
import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -23,28 +19,27 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName());
@Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
- try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) {
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
+ ZooKeeperAdmin zooKeeperAdmin = null;
+ try {
+ zooKeeperAdmin = createAdmin(connectionSpec);
long fromConfig = -1;
// Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0)
- log.log(Level.INFO, "Applying ZooKeeper config: " + servers);
- byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null);
+ byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null);
log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
-
- // Verify by issuing a write operation; this is only accepted once new quorum is obtained.
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- zooKeeperAdmin.delete(node, -1);
-
- log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
- }
- catch ( KeeperException.ReconfigInProgress
- | KeeperException.ConnectionLossException
- | KeeperException.NewConfigNoQuorum e) {
- throw new ReconfigException(e);
- }
- catch (KeeperException | IOException | InterruptedException e) {
+ } catch (KeeperException e) {
+ if (retryOn(e))
+ throw new ReconfigException(e);
+ else
+ throw new RuntimeException(e);
+ } catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ if (zooKeeperAdmin != null) {
+ try {
+ zooKeeperAdmin.close();
+ } catch (InterruptedException e) { /* ignore */}
+ }
}
}
@@ -53,5 +48,11 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
(event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig());
}
+ private static boolean retryOn(KeeperException e) {
+ return e instanceof KeeperException.ReconfigInProgress ||
+ e instanceof KeeperException.ConnectionLossException ||
+ e instanceof KeeperException.NewConfigNoQuorum;
+ }
+
}
diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
deleted file mode 100644
index 922c389f94a..00000000000
--- a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeper;
-
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.net.HostName;
-import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer;
-import com.yahoo.vespa.zookeeper.Reconfigurer;
-import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl;
-import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.IntStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
-import static org.junit.Assert.assertEquals;
-
-public class VespaZooKeeperTest {
-
- static final Path tempDirRoot = getTmpDir();
- static final List<Integer> ports = new ArrayList<>();
-
- /**
- * Performs dynamic reconfiguration of ZooKeeper servers.
- *
- * First, a cluster of 3 servers is set up, and some data is written to it.
- * Then, 3 new servers are added, and the first 3 marked for retirement;
- * this should force the quorum to move the 3 new servers, but not disconnect the old ones.
- * Next, the old servers are removed.
- * Then, the cluster is reduced to size 1.
- * Finally, the cluster grows to size 3 again.
- *
- * Throughout all of this, quorum should remain, and the data should remain the same.
- */
- @Test(timeout = 120_000)
- @Ignore // Unstable, some ZK server keeps resetting connections sometimes.
- public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException {
- List<ZooKeeper> keepers = new ArrayList<>();
- for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper());
- for (int i = 0; i < 8; i++) keepers.get(i).run();
-
- // Start the first three servers.
- List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0);
- for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i);
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for all servers to be up and running.
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Write data to verify later.
- String path = writeData(configs.get(0));
-
- // Let three new servers join, causing the three older ones to retire and leave the ensemble.
- configs = getConfigs(0, 3, 3, 3);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // The existing servers can't reconfigure and leave before the joiners are up.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for new quorum to be established.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
- // Old servers are removed.
- configs = getConfigs(3, 0, 3, 0);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // Old servers shut down, while the newer servers remain.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Ensure old servers shut down properly.
- for (int i = 0; i < 3; i++) keepers.get(i).await();
- // Ensure new servers have reconfigured.
- for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
-
- // Cluster shrinks to a single server.
- configs = getConfigs(5, 0, 1, 0);
- for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // We let the remaining server reconfigure the others out before they die.
- for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 3; i < 5; i++) keepers.get(i).await();
- verifyData(path, configs.get(5));
-
- // Cluster grows to 3 servers again.
- configs = getConfigs(5, 0, 3, 2);
- for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Wait for the joiners.
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- verifyData(path, configs.get(7));
-
- // Let the remaining servers terminate.
- for (int i = 5; i < 8; i++) keepers.get(i).config = null;
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 8; i++) keepers.get(i).await();
- }
-
- static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- ZooKeeperAdmin admin = createAdmin(config);
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- String read = new String(admin.getData(node, false, new Stat()), UTF_8);
- assertEquals("hi", read);
- return node;
- }
-
- static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- for (int i = 0; i < 10; i++) {
- try {
- assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8));
- return;
- }
- catch (KeeperException.ConnectionLossException e) {
- e.printStackTrace();
- Thread.sleep(10 << i);
- }
- }
- }
-
- static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException {
- return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(),
- 10_000,
- System.err::println,
- new ZkClientConfigBuilder().toConfig());
- }
-
- static class ZooKeeper {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- final Phaser phaser = new Phaser(2);
- final AtomicReference<Future<?>> future = new AtomicReference<>();
- ZookeeperServerConfig config;
-
- void run() {
- future.set(executor.submit(() -> {
- Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl());
- phaser.arriveAndAwaitAdvance();
- while (config != null) {
- new ReconfigurableVespaZooKeeperServer(reconfigurer, config);
- phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here
- phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff
- }
- reconfigurer.deconstruct();
- }));
- }
-
- void await() throws ExecutionException, InterruptedException, TimeoutException {
- future.get().get(30, SECONDS);
- }
- }
-
- static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) {
- return IntStream.rangeClosed(1, removed + retired + active)
- .mapToObj(id -> getConfig(removed, retired, active, joining, id))
- .collect(toList());
- }
-
- // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed.
- static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) {
- if (id <= removed)
- return null;
-
- Path tempDir = tempDirRoot.resolve("zookeeper-" + id);
- return new ZookeeperServerConfig.Builder()
- .clientPort(getPorts(id).get(0))
- .dataDir(tempDir.toString())
- .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString())
- .myid(id)
- .myidFile(tempDir.resolve("myid").toString())
- .dynamicReconfiguration(true)
- .server(IntStream.rangeClosed(removed + 1, removed + retired + active)
- .mapToObj(i -> new ZookeeperServerConfig.Server.Builder()
- .id(i)
- .clientPort(getPorts(i).get(0))
- .electionPort(getPorts(i).get(1))
- .quorumPort(getPorts(i).get(2))
- .hostname("localhost")
- .joining(i - removed > retired + active - joining)
- .retired(i - removed <= retired))
- .collect(toList()))
- .build();
- }
-
- static List<Integer> getPorts(int id) {
- if (ports.size() < id * 3) {
- int previousPort;
- if (ports.isEmpty()) {
- String[] version = System.getProperty("zk-version").split("\\.");
- int versionPortOffset = 0;
- for (String part : version)
- versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part));
- previousPort = 20000 + versionPortOffset % 30000;
- }
- else
- previousPort = ports.get(ports.size() - 1);
-
- for (int i = 0; i < 3; i++)
- ports.add(previousPort = nextPort(previousPort));
- }
- return ports.subList(id * 3 - 3, id * 3);
- }
-
- static int nextPort(int previousPort) {
- for (int j = 1; j <= 30000; j++) {
- int port = (previousPort + j);
- while (port > 50000)
- port -= 30000;
-
- try (ServerSocket socket = new ServerSocket(port)) {
- return socket.getLocalPort();
- }
- catch (IOException e) {
- System.err.println("Could not bind port " + port + ": " + e);
- }
- }
- throw new RuntimeException("No free ports");
- }
-
- static Path getTmpDir() {
- try {
- Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test");
- tempDir.toFile().deleteOnExit();
- return tempDir.toAbsolutePath();
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.7.0/pom.xml b/zookeeper-server/zookeeper-server-3.7.0/pom.xml
index 01fd83a496b..ac7db35e6af 100644
--- a/zookeeper-server/zookeeper-server-3.7.0/pom.xml
+++ b/zookeeper-server/zookeeper-server-3.7.0/pom.xml
@@ -11,9 +11,6 @@
<artifactId>zookeeper-server-3.7.0</artifactId>
<packaging>container-plugin</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <zookeeper.version>3.7.0</zookeeper.version>
- </properties>
<dependencies>
<dependency>
<groupId>com.yahoo.vespa</groupId>
@@ -35,7 +32,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
+ <version>3.7.0</version>
<exclusions>
<!--
Container provides wiring for all common log libraries
@@ -90,6 +87,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
+ <arg>-Werror</arg>
</compilerArgs>
</configuration>
</plugin>
@@ -99,9 +97,6 @@
<configuration>
<redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile>
<forkMode>once</forkMode>
- <systemPropertyVariables>
- <zk-version>${zookeeper.version}</zk-version>
- </systemPropertyVariables>
</configuration>
</plugin>
<plugin>
diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
index 246911fdfc7..c002ffa72ce 100644
--- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
@@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
- private QuorumPeer peer;
+ private final AtomicReference<QuorumPeer> peer = new AtomicReference<>();
@Inject
public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) {
- peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer());
+ reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set);
}
@Override
public void shutdown() {
- peer.shutdown(Duration.ofMinutes(1));
+ peer.get().shutdown(Duration.ofMinutes(1));
}
@Override
public void start(Path configFilePath) {
- peer.start(configFilePath);
+ peer.get().start(configFilePath);
}
@Override
diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
index ae7bf8d84f5..27aa18c64c7 100644
--- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
+++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
@@ -2,15 +2,11 @@
package com.yahoo.vespa.zookeeper;
import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -23,28 +19,27 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName());
@Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
- try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) {
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
+ ZooKeeperAdmin zooKeeperAdmin = null;
+ try {
+ zooKeeperAdmin = createAdmin(connectionSpec);
long fromConfig = -1;
- // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0).
- log.log(Level.INFO, "Applying ZooKeeper config: " + servers);
- byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null);
+ // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0)
+ byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null);
log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
-
- // Verify by issuing a write operation; this is only accepted once new quorum is obtained.
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- zooKeeperAdmin.delete(node, -1);
-
- log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
- }
- catch ( KeeperException.ReconfigInProgress
- | KeeperException.ConnectionLossException
- | KeeperException.NewConfigNoQuorum e) {
- throw new ReconfigException(e);
- }
- catch (KeeperException | IOException | InterruptedException e) {
+ } catch (KeeperException e) {
+ if (retryOn(e))
+ throw new ReconfigException(e);
+ else
+ throw new RuntimeException(e);
+ } catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ if (zooKeeperAdmin != null) {
+ try {
+ zooKeeperAdmin.close();
+ } catch (InterruptedException e) { /* ignore */}
+ }
}
}
@@ -53,5 +48,11 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
(event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig());
}
+ private static boolean retryOn(KeeperException e) {
+ return e instanceof KeeperException.ReconfigInProgress ||
+ e instanceof KeeperException.ConnectionLossException ||
+ e instanceof KeeperException.NewConfigNoQuorum;
+ }
+
}
diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
deleted file mode 100644
index 922c389f94a..00000000000
--- a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeper;
-
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.net.HostName;
-import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer;
-import com.yahoo.vespa.zookeeper.Reconfigurer;
-import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl;
-import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.IntStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
-import static org.junit.Assert.assertEquals;
-
-public class VespaZooKeeperTest {
-
- static final Path tempDirRoot = getTmpDir();
- static final List<Integer> ports = new ArrayList<>();
-
- /**
- * Performs dynamic reconfiguration of ZooKeeper servers.
- *
- * First, a cluster of 3 servers is set up, and some data is written to it.
- * Then, 3 new servers are added, and the first 3 marked for retirement;
- * this should force the quorum to move the 3 new servers, but not disconnect the old ones.
- * Next, the old servers are removed.
- * Then, the cluster is reduced to size 1.
- * Finally, the cluster grows to size 3 again.
- *
- * Throughout all of this, quorum should remain, and the data should remain the same.
- */
- @Test(timeout = 120_000)
- @Ignore // Unstable, some ZK server keeps resetting connections sometimes.
- public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException {
- List<ZooKeeper> keepers = new ArrayList<>();
- for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper());
- for (int i = 0; i < 8; i++) keepers.get(i).run();
-
- // Start the first three servers.
- List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0);
- for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i);
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for all servers to be up and running.
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Write data to verify later.
- String path = writeData(configs.get(0));
-
- // Let three new servers join, causing the three older ones to retire and leave the ensemble.
- configs = getConfigs(0, 3, 3, 3);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // The existing servers can't reconfigure and leave before the joiners are up.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for new quorum to be established.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
- // Old servers are removed.
- configs = getConfigs(3, 0, 3, 0);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // Old servers shut down, while the newer servers remain.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Ensure old servers shut down properly.
- for (int i = 0; i < 3; i++) keepers.get(i).await();
- // Ensure new servers have reconfigured.
- for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
-
- // Cluster shrinks to a single server.
- configs = getConfigs(5, 0, 1, 0);
- for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // We let the remaining server reconfigure the others out before they die.
- for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 3; i < 5; i++) keepers.get(i).await();
- verifyData(path, configs.get(5));
-
- // Cluster grows to 3 servers again.
- configs = getConfigs(5, 0, 3, 2);
- for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Wait for the joiners.
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- verifyData(path, configs.get(7));
-
- // Let the remaining servers terminate.
- for (int i = 5; i < 8; i++) keepers.get(i).config = null;
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 8; i++) keepers.get(i).await();
- }
-
- static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- ZooKeeperAdmin admin = createAdmin(config);
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- String read = new String(admin.getData(node, false, new Stat()), UTF_8);
- assertEquals("hi", read);
- return node;
- }
-
- static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- for (int i = 0; i < 10; i++) {
- try {
- assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8));
- return;
- }
- catch (KeeperException.ConnectionLossException e) {
- e.printStackTrace();
- Thread.sleep(10 << i);
- }
- }
- }
-
- static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException {
- return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(),
- 10_000,
- System.err::println,
- new ZkClientConfigBuilder().toConfig());
- }
-
- static class ZooKeeper {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- final Phaser phaser = new Phaser(2);
- final AtomicReference<Future<?>> future = new AtomicReference<>();
- ZookeeperServerConfig config;
-
- void run() {
- future.set(executor.submit(() -> {
- Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl());
- phaser.arriveAndAwaitAdvance();
- while (config != null) {
- new ReconfigurableVespaZooKeeperServer(reconfigurer, config);
- phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here
- phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff
- }
- reconfigurer.deconstruct();
- }));
- }
-
- void await() throws ExecutionException, InterruptedException, TimeoutException {
- future.get().get(30, SECONDS);
- }
- }
-
- static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) {
- return IntStream.rangeClosed(1, removed + retired + active)
- .mapToObj(id -> getConfig(removed, retired, active, joining, id))
- .collect(toList());
- }
-
- // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed.
- static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) {
- if (id <= removed)
- return null;
-
- Path tempDir = tempDirRoot.resolve("zookeeper-" + id);
- return new ZookeeperServerConfig.Builder()
- .clientPort(getPorts(id).get(0))
- .dataDir(tempDir.toString())
- .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString())
- .myid(id)
- .myidFile(tempDir.resolve("myid").toString())
- .dynamicReconfiguration(true)
- .server(IntStream.rangeClosed(removed + 1, removed + retired + active)
- .mapToObj(i -> new ZookeeperServerConfig.Server.Builder()
- .id(i)
- .clientPort(getPorts(i).get(0))
- .electionPort(getPorts(i).get(1))
- .quorumPort(getPorts(i).get(2))
- .hostname("localhost")
- .joining(i - removed > retired + active - joining)
- .retired(i - removed <= retired))
- .collect(toList()))
- .build();
- }
-
- static List<Integer> getPorts(int id) {
- if (ports.size() < id * 3) {
- int previousPort;
- if (ports.isEmpty()) {
- String[] version = System.getProperty("zk-version").split("\\.");
- int versionPortOffset = 0;
- for (String part : version)
- versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part));
- previousPort = 20000 + versionPortOffset % 30000;
- }
- else
- previousPort = ports.get(ports.size() - 1);
-
- for (int i = 0; i < 3; i++)
- ports.add(previousPort = nextPort(previousPort));
- }
- return ports.subList(id * 3 - 3, id * 3);
- }
-
- static int nextPort(int previousPort) {
- for (int j = 1; j <= 30000; j++) {
- int port = (previousPort + j);
- while (port > 50000)
- port -= 30000;
-
- try (ServerSocket socket = new ServerSocket(port)) {
- return socket.getLocalPort();
- }
- catch (IOException e) {
- System.err.println("Could not bind port " + port + ": " + e);
- }
- }
- throw new RuntimeException("No free ports");
- }
-
- static Path getTmpDir() {
- try {
- Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test");
- tempDir.toFile().deleteOnExit();
- return tempDir.toAbsolutePath();
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
index 8b22f658a94..39d0312915f 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
@@ -86,7 +86,7 @@ public class Configurator {
sb.append("reconfigEnabled=true").append("\n");
sb.append("skipACL=yes").append("\n");
ensureThisServerIsRepresented(config.myid(), config.server());
- config.server().forEach(server -> sb.append(serverSpec(server, config.clientPort(), server.joining())).append("\n"));
+ config.server().forEach(server -> addServerToCfg(sb, server, config.clientPort()));
sb.append(new TlsQuorumConfig().createConfig(vespaTlsConfig));
sb.append(new TlsClientServerConfig().createConfig(vespaTlsConfig));
return sb.toString();
@@ -111,8 +111,7 @@ public class Configurator {
}
}
- static String serverSpec(ZookeeperServerConfig.Server server, int clientPort, boolean joining) {
- StringBuilder sb = new StringBuilder();
+ private void addServerToCfg(StringBuilder sb, ZookeeperServerConfig.Server server, int clientPort) {
sb.append("server.")
.append(server.id())
.append("=")
@@ -121,7 +120,7 @@ public class Configurator {
.append(server.quorumPort())
.append(":")
.append(server.electionPort());
- if (joining) {
+ if (server.joining()) {
// Servers that are joining an existing cluster must be marked as observers. Note that this will NOT
// actually make the server an observer, but prevent it from forming an ensemble independently of the
// existing cluster.
@@ -131,8 +130,8 @@ public class Configurator {
.append("observer");
}
sb.append(";")
- .append(server.clientPort());
- return sb.toString();
+ .append(clientPort)
+ .append("\n");
}
static List<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) {
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
index 604419c063d..d4223e4d815 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
@@ -10,14 +10,14 @@ import com.yahoo.yolean.Exceptions;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
-
-import static com.yahoo.vespa.zookeeper.Configurator.serverSpec;
-import static java.util.stream.Collectors.toList;
+import java.util.stream.Collectors;
/**
* Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component
@@ -50,22 +50,17 @@ public class Reconfigurer extends AbstractComponent {
this.sleeper = Objects.requireNonNull(sleeper);
}
- @Override
- public void deconstruct() {
- shutdown();
- }
-
- QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
- Supplier<QuorumPeer> quorumPeerCreator) {
+ void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
+ Supplier<QuorumPeer> quorumPeerGetter, Consumer<QuorumPeer> quorumPeerSetter) {
if (zooKeeperRunner == null) {
- peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers.
+ peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with later servers.
zooKeeperRunner = startServer(newConfig, server);
}
+ quorumPeerSetter.accept(peer);
- if (newConfig.dynamicReconfiguration()) {
+ if (shouldReconfigure(newConfig)) {
reconfigure(newConfig);
}
- return peer;
}
ZookeeperServerConfig activeConfig() {
@@ -78,30 +73,42 @@ public class Reconfigurer extends AbstractComponent {
}
}
+ private boolean shouldReconfigure(ZookeeperServerConfig newConfig) {
+ if (!newConfig.dynamicReconfiguration()) return false;
+ if (activeConfig == null) return false;
+ return !newConfig.equals(activeConfig());
+ }
+
private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) {
ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server);
activeConfig = zookeeperServerConfig;
return runner;
}
- // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>)
- // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers
- // TODO jonmv: wrap Curator in Provider, for Curator shutdown
private void reconfigure(ZookeeperServerConfig newConfig) {
Instant reconfigTriggered = Instant.now();
- String newServers = String.join(",", servers(newConfig));
- log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." +
+ // No point in trying to reconfigure if there is only one server in the new ensemble,
+ // the others will be shutdown or are about to be shutdown
+ if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO);
+
+ List<String> newServers = difference(servers(newConfig), servers(activeConfig));
+ String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig));
+ String joiningServersSpec = String.join(",", newServers);
+ leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds;
+ joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec;
+ log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec +
+ "\nleaving servers: " + leavingServerIds +
"\nServers in active config:" + servers(activeConfig) +
"\nServers in new config:" + servers(newConfig));
String connectionSpec = localConnectionSpec(activeConfig);
Instant now = Instant.now();
- Duration reconfigTimeout = reconfigTimeout(newConfig.server().size());
+ Duration reconfigTimeout = reconfigTimeout(newServers.size());
Instant end = now.plus(reconfigTimeout);
// Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
for (int attempt = 1; now.isBefore(end); attempt++) {
try {
Instant reconfigStarted = Instant.now();
- vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers);
+ vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds);
Instant reconfigEnded = Instant.now();
log.log(Level.INFO, "Reconfiguration completed in " +
Duration.between(reconfigTriggered, reconfigEnded) +
@@ -140,11 +147,24 @@ public class Reconfigurer extends AbstractComponent {
return HostName.getLocalhost() + ":" + config.clientPort();
}
+ private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) {
+ return difference(servers(oldConfig), servers(newConfig)).stream()
+ .map(server -> server.substring(0, server.indexOf('=')))
+ .collect(Collectors.toList());
+ }
+
private static List<String> servers(ZookeeperServerConfig config) {
+ // See https://zookeeper.apache.org/doc/r3.6.3/zookeeperReconfig.html#sc_reconfig_clientport for format
return config.server().stream()
- .filter(server -> ! server.retired())
- .map(server -> serverSpec(server, config.clientPort(), false))
- .collect(toList());
+ .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" +
+ server.electionPort() + ";" + config.clientPort())
+ .collect(Collectors.toList());
+ }
+
+ private static <T> List<T> difference(List<T> list1, List<T> list2) {
+ List<T> copy = new ArrayList<>(list1);
+ copy.removeAll(list2);
+ return copy;
}
}
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
index 59c9628bcab..8809dca0def 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
@@ -10,7 +10,7 @@ import java.time.Duration;
*/
public interface VespaZooKeeperAdmin {
- void reconfigure(String connectionSpec, String servers) throws ReconfigException;
+ void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException;
/* Timeout for connecting to ZooKeeper */
default Duration sessionTimeout() { return Duration.ofSeconds(30); }
diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
index 760c326cf5d..1211624e3d6 100644
--- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
+++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
@@ -17,6 +17,7 @@ import java.util.Arrays;
import java.util.concurrent.Phaser;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
/**
@@ -50,26 +51,31 @@ public class ReconfigurerTest {
ZookeeperServerConfig nextConfig = createConfig(5, true);
reconfigurer.startOrReconfigure(nextConfig);
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181",
- reconfigurer.servers());
- assertEquals(2, reconfigurer.reconfigurations());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
+ assertSame(nextConfig, reconfigurer.activeConfig());
+
+ // No reconfiguration happens with same config
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals(1, reconfigurer.reconfigurations());
assertSame(nextConfig, reconfigurer.activeConfig());
// Cluster shrinks
nextConfig = createConfig(3, true);
reconfigurer.startOrReconfigure(nextConfig);
- assertEquals(3, reconfigurer.reconfigurations());
+ assertEquals(2, reconfigurer.reconfigurations());
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181",
- reconfigurer.servers());
+ assertNull("No servers are joining", reconfigurer.joiningServers());
+ assertEquals("3,4", reconfigurer.leavingServers());
assertSame(nextConfig, reconfigurer.activeConfig());
// Cluster loses node1, but node3 joins. Indices are shuffled.
nextConfig = createConfig(3, true, 1);
reconfigurer.startOrReconfigure(nextConfig);
- assertEquals(4, reconfigurer.reconfigurations());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node2:2182:2183;2181,server.2=node3:2182:2183;2181",
- reconfigurer.servers());
+ assertEquals(3, reconfigurer.reconfigurations());
+ assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers());
+ assertEquals("1,2", reconfigurer.leavingServers());
assertSame(nextConfig, reconfigurer.activeConfig());
}
@@ -83,9 +89,9 @@ public class ReconfigurerTest {
ZookeeperServerConfig nextConfig = createConfig(5, true);
reconfigurer.startOrReconfigure(nextConfig);
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181",
- reconfigurer.servers());
- assertEquals(2, reconfigurer.reconfigurations());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
assertSame(nextConfig, reconfigurer.activeConfig());
}
@@ -106,27 +112,24 @@ public class ReconfigurerTest {
reconfigurer.shutdown();
}
- private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... retiredIndices) {
- Arrays.sort(retiredIndices);
+ private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) {
+ Arrays.sort(skipIndices);
ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder();
builder.zooKeeperConfigFile(cfgFile.getAbsolutePath());
builder.myidFile(idFile.getAbsolutePath());
for (int i = 0, index = 0; i < numberOfServers; i++, index++) {
- boolean retired = Arrays.binarySearch(retiredIndices, index) >= 0;
- if (retired) i--;
- builder.server(newServer(i, "node" + index, retired));
+ while (Arrays.binarySearch(skipIndices, index) >= 0) index++;
+ builder.server(newServer(i, "node" + index));
}
-
builder.myid(0);
builder.dynamicReconfiguration(dynamicReconfiguration);
return builder.build();
}
- private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName, boolean retired) {
+ private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) {
ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder();
builder.id(id);
builder.hostname(hostName);
- builder.retired(retired);
return builder;
}
@@ -139,7 +142,6 @@ public class ReconfigurerTest {
private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer {
private final TestableVespaZooKeeperAdmin zooKeeperAdmin;
- private final Phaser phaser = new Phaser(2);
private QuorumPeer serverPeer;
TestableReconfigurer(TestableVespaZooKeeperAdmin zooKeeperAdmin) {
@@ -154,16 +156,19 @@ public class ReconfigurerTest {
}
void startOrReconfigure(ZookeeperServerConfig newConfig) {
- serverPeer = startOrReconfigure(newConfig, this, MockQuorumPeer::new);
- phaser.arriveAndDeregister();
+ startOrReconfigure(newConfig, this, MockQuorumPeer::new, peer -> serverPeer = peer);
}
String connectionSpec() {
return zooKeeperAdmin.connectionSpec;
}
- String servers() {
- return zooKeeperAdmin.servers;
+ String joiningServers() {
+ return zooKeeperAdmin.joiningServers;
+ }
+
+ String leavingServers() {
+ return zooKeeperAdmin.leavingServers;
}
int reconfigurations() {
@@ -172,14 +177,10 @@ public class ReconfigurerTest {
@Override
public void shutdown() {
- phaser.arriveAndAwaitAdvance();
serverPeer.shutdown(Duration.ofSeconds(1)); }
@Override
- public void start(Path configFilePath) {
- phaser.arriveAndAwaitAdvance();
- serverPeer.start(configFilePath);
- }
+ public void start(Path configFilePath) { serverPeer.start(configFilePath); }
@Override
public boolean reconfigurable() {
@@ -191,7 +192,8 @@ public class ReconfigurerTest {
private static class TestableVespaZooKeeperAdmin implements VespaZooKeeperAdmin {
String connectionSpec;
- String servers;
+ String joiningServers;
+ String leavingServers;
int reconfigurations = 0;
private int failures = 0;
@@ -203,11 +205,12 @@ public class ReconfigurerTest {
}
@Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
if (++attempts < failures)
throw new ReconfigException("Reconfig failed");
this.connectionSpec = connectionSpec;
- this.servers = servers;
+ this.joiningServers = joiningServers;
+ this.leavingServers = leavingServers;
this.reconfigurations++;
}