diff options
16 files changed, 595 insertions, 151 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java index 6e6f027b520..f93ee3ce9c4 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java @@ -50,14 +50,12 @@ public class ClusterControllerCluster extends AbstractConfigProducer<ClusterCont public void getConfig(ZookeeperServerConfig.Builder builder) { builder.clientPort(ZK_CLIENT_PORT); builder.juteMaxBuffer(1024 * 1024); // 1 Mb should be more than enough for cluster controller - boolean oldQuorumExists = containerCluster.getContainers().stream() // More than half the previous hosts must be present in the new config for quorum to persist. - .filter(container -> previousHosts.contains(container.getHostName())) // Set intersection is symmetric. - .count() > previousHosts.size() / 2; for (ClusterControllerContainer container : containerCluster.getContainers()) { ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder(); serverBuilder.hostname(container.getHostName()); serverBuilder.id(container.index()); - serverBuilder.joining(oldQuorumExists && ! previousHosts.contains(container.getHostName())); + serverBuilder.joining( ! previousHosts.contains(container.getHostName())); + serverBuilder.retired(container.isRetired()); builder.server(serverBuilder); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java index c4d420f2d44..438e143bdfd 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java @@ -309,8 +309,9 @@ public final class ApplicationContainerCluster extends ContainerCluster<Applicat ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder(); serverBuilder.hostname(container.getHostName()) .id(container.index()) - .joining(!previousHosts.isEmpty() && - !previousHosts.contains(container.getHostName())); + .joining( ! previousHosts.isEmpty() && + ! previousHosts.contains(container.getHostName())) + .retired(container.isRetired()); builder.server(serverBuilder); builder.dynamicReconfiguration(true); } diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java index 10f883bdc75..ba70b7493a2 100644 --- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java +++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java @@ -2054,7 +2054,7 @@ public class ModelProvisioningTest { assertTrue("Initial servers are not joining", config.build().server().stream().noneMatch(ZookeeperServerConfig.Server::joining)); } { - VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(5), true, false, false, 0, Optional.of(model), new DeployState.Builder()); + VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(3), true, false, false, 0, Optional.of(model), new DeployState.Builder(), "node-1-3-10-04", "node-1-3-10-03"); ApplicationContainerCluster cluster = nextModel.getContainerClusters().get("zk"); ZookeeperServerConfig.Builder config = new ZookeeperServerConfig.Builder(); cluster.getContainers().forEach(c -> c.getConfig(config)); @@ -2067,6 +2067,14 @@ public class ModelProvisioningTest { 4, true), config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id, ZookeeperServerConfig.Server::joining))); + assertEquals("Retired nodes are retired", + Map.of(0, false, + 1, true, + 2, true, + 3, false, + 4, false), + config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id, + ZookeeperServerConfig.Server::retired))); } } diff --git a/configdefinitions/src/vespa/zookeeper-server.def b/configdefinitions/src/vespa/zookeeper-server.def index 04632ffd35f..d80ccc4d042 100644 --- a/configdefinitions/src/vespa/zookeeper-server.def +++ b/configdefinitions/src/vespa/zookeeper-server.def @@ -32,10 +32,13 @@ juteMaxBuffer int default=52428800 myid int restart server[].id int server[].hostname string +server[].clientPort int default=2181 server[].quorumPort int default=2182 server[].electionPort int default=2183 # Whether this server is joining an existing cluster server[].joining bool default=false +# Whether this server is retired, and about to be removed +server[].retired bool default=false # Needed when upgrading from ZooKeeper 3.4 to 3.5, see https://issues.apache.org/jira/browse/ZOOKEEPER-3056, # and in general where there is a zookeeper ensemble running that has had few transactions. diff --git a/zookeeper-server/zookeeper-server-3.6.3/pom.xml b/zookeeper-server/zookeeper-server-3.6.3/pom.xml index f7e6f512f7c..a8ad183de4e 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/pom.xml +++ b/zookeeper-server/zookeeper-server-3.6.3/pom.xml @@ -11,6 +11,9 @@ <artifactId>zookeeper-server-3.6.3</artifactId> <packaging>container-plugin</packaging> <version>7-SNAPSHOT</version> + <properties> + <zookeeper.version>3.6.3</zookeeper.version> + </properties> <dependencies> <dependency> <groupId>com.yahoo.vespa</groupId> @@ -32,7 +35,7 @@ <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <version>3.6.3</version> + <version>${zookeeper.version}</version> <exclusions> <!-- Container provides wiring for all common log libraries @@ -87,7 +90,6 @@ <configuration> <compilerArgs> <arg>-Xlint:all</arg> - <arg>-Werror</arg> </compilerArgs> </configuration> </plugin> @@ -97,6 +99,9 @@ <configuration> <redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile> <forkMode>once</forkMode> + <systemPropertyVariables> + <zk-version>${zookeeper.version}</zk-version> + </systemPropertyVariables> </configuration> </plugin> <plugin> diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index c002ffa72ce..246911fdfc7 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference; */ public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { - private final AtomicReference<QuorumPeer> peer = new AtomicReference<>(); + private QuorumPeer peer; @Inject public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) { - reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set); + peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer()); } @Override public void shutdown() { - peer.get().shutdown(Duration.ofMinutes(1)); + peer.shutdown(Duration.ofMinutes(1)); } @Override public void start(Path configFilePath) { - peer.get().start(configFilePath); + peer.start(configFilePath); } @Override diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java index 27aa18c64c7..f0a95b70e96 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java +++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java @@ -2,11 +2,15 @@ package com.yahoo.vespa.zookeeper; import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -19,27 +23,28 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName()); @Override - public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { - ZooKeeperAdmin zooKeeperAdmin = null; - try { - zooKeeperAdmin = createAdmin(connectionSpec); + public void reconfigure(String connectionSpec, String servers) throws ReconfigException { + try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) { long fromConfig = -1; // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0) - byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null); + log.log(Level.INFO, "Applying ZooKeeper config: " + servers); + byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null); log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); - } catch (KeeperException e) { - if (retryOn(e)) - throw new ReconfigException(e); - else - throw new RuntimeException(e); - } catch (IOException | InterruptedException e) { + + // Verify by issuing a write operation; this is only accepted once new quorum is obtained. + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL); + zooKeeperAdmin.delete(node, -1); + + log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); + } + catch ( KeeperException.ReconfigInProgress + | KeeperException.ConnectionLossException + | KeeperException.NewConfigNoQuorum e) { + throw new ReconfigException(e); + } + catch (KeeperException | IOException | InterruptedException e) { throw new RuntimeException(e); - } finally { - if (zooKeeperAdmin != null) { - try { - zooKeeperAdmin.close(); - } catch (InterruptedException e) { /* ignore */} - } } } @@ -48,11 +53,5 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { (event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig()); } - private static boolean retryOn(KeeperException e) { - return e instanceof KeeperException.ReconfigInProgress || - e instanceof KeeperException.ConnectionLossException || - e instanceof KeeperException.NewConfigNoQuorum; - } - } diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java new file mode 100644 index 00000000000..405ec1365fa --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java @@ -0,0 +1,224 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.net.HostName; +import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer; +import com.yahoo.vespa.zookeeper.Reconfigurer; +import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl; +import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; + +public class VespaZooKeeperTest { + + static final Path tempDirRoot = getTmpDir(); + + /** + * Performs dynamic reconfiguration of ZooKeeper servers. + * + * First, a cluster of 3 servers is set up, and some data is written to it. + * Then, 3 new servers are added, and the first 3 marked for retirement; + * this should force the quorum to move the 3 new servers, but not disconnect the old ones. + * Next, the old servers are removed. + * Then, the cluster is reduced to size 1. + * Finally, the cluster grows to size 3 again. + * + * Throughout all of this, quorum should remain, and the data should remain the same. + */ + @Test(timeout = 120_000) + public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException { + List<ZooKeeper> keepers = new ArrayList<>(); + for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper()); + for (int i = 0; i < 8; i++) keepers.get(i).run(); + + // Start the first three servers. + List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0); + for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i); + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for all servers to be up and running. + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Write data to verify later. + String path = writeData(configs.get(0)); + + // Let three new servers join, causing the three older ones to retire and leave the ensemble. + configs = getConfigs(0, 3, 3, 3); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // The existing servers can't reconfigure and leave before the joiners are up. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for new quorum to be established. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + // Old servers are removed. + configs = getConfigs(3, 0, 3, 0); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // Old servers shut down, while the newer servers remain. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Ensure old servers shut down properly. + for (int i = 0; i < 3; i++) keepers.get(i).await(); + // Ensure new servers have reconfigured. + for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + + // Cluster shrinks to a single server. + configs = getConfigs(5, 0, 1, 0); + for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // We let the remaining server reconfigure the others out before they die. + for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 3; i < 5; i++) keepers.get(i).await(); + verifyData(path, configs.get(5)); + + // Cluster grows to 3 servers again. + configs = getConfigs(5, 0, 3, 2); + for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Wait for the joiners. + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + verifyData(path, configs.get(7)); + + // Let the remaining servers terminate. + for (int i = 5; i < 8; i++) keepers.get(i).config = null; + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 8; i++) keepers.get(i).await(); + } + + static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + ZooKeeperAdmin admin = createAdmin(config); + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL); + String read = new String(admin.getData(node, false, new Stat()), UTF_8); + assertEquals("hi", read); + return node; + } + + static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + for (int i = 0; i < 10; i++) { + try { + assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8)); + return; + } + catch (KeeperException.ConnectionLossException e) { + e.printStackTrace(); + Thread.sleep(10 << i); + } + } + } + + static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException { + return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(), + 10_000, + System.err::println, + new ZkClientConfigBuilder().toConfig()); + } + + static class ZooKeeper { + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Phaser phaser = new Phaser(2); + final AtomicReference<Future<?>> future = new AtomicReference<>(); + ZookeeperServerConfig config; + + void run() { + future.set(executor.submit(() -> { + Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl()); + phaser.arriveAndAwaitAdvance(); + while (config != null) { + new ReconfigurableVespaZooKeeperServer(reconfigurer, config); + phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here + phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff + } + reconfigurer.deconstruct(); + })); + } + + void await() throws ExecutionException, InterruptedException, TimeoutException { + future.get().get(30, SECONDS); + } + } + + static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) { + return IntStream.rangeClosed(1, removed + retired + active) + .mapToObj(id -> getConfig(removed, retired, active, joining, id)) + .collect(toList()); + } + + // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed. + static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) { + if (id <= removed) + return null; + + Path tempDir = tempDirRoot.resolve("zookeeper-" + id); + String[] version = System.getProperty("zk-version").split("\\."); + int versionPortOffset = 0; + for (String part : version) + versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part)); + int port = 51000 + versionPortOffset % 9785; + return new ZookeeperServerConfig.Builder() + .clientPort(port + 3 * id) + .dataDir(tempDir.toString()) + .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString()) + .myid(id) + .myidFile(tempDir.resolve("myid").toString()) + .dynamicReconfiguration(true) + .server(IntStream.rangeClosed(removed + 1, removed + retired + active) + .mapToObj(i -> new ZookeeperServerConfig.Server.Builder() + .id(i) + .clientPort(port + 3 * i) + .electionPort(port + 3 * i + 1) + .quorumPort(port + 3 * i + 2) + .hostname("localhost") + .joining(i - removed > retired + active - joining) + .retired(i - removed <= retired)) + .collect(toList())) + .build(); + } + + static Path getTmpDir() { + try { + Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test"); + tempDir.toFile().deleteOnExit(); + return tempDir.toAbsolutePath(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.7.0/pom.xml b/zookeeper-server/zookeeper-server-3.7.0/pom.xml index ac7db35e6af..01fd83a496b 100644 --- a/zookeeper-server/zookeeper-server-3.7.0/pom.xml +++ b/zookeeper-server/zookeeper-server-3.7.0/pom.xml @@ -11,6 +11,9 @@ <artifactId>zookeeper-server-3.7.0</artifactId> <packaging>container-plugin</packaging> <version>7-SNAPSHOT</version> + <properties> + <zookeeper.version>3.7.0</zookeeper.version> + </properties> <dependencies> <dependency> <groupId>com.yahoo.vespa</groupId> @@ -32,7 +35,7 @@ <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <version>3.7.0</version> + <version>${zookeeper.version}</version> <exclusions> <!-- Container provides wiring for all common log libraries @@ -87,7 +90,6 @@ <configuration> <compilerArgs> <arg>-Xlint:all</arg> - <arg>-Werror</arg> </compilerArgs> </configuration> </plugin> @@ -97,6 +99,9 @@ <configuration> <redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile> <forkMode>once</forkMode> + <systemPropertyVariables> + <zk-version>${zookeeper.version}</zk-version> + </systemPropertyVariables> </configuration> </plugin> <plugin> diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index c002ffa72ce..246911fdfc7 100644 --- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference; */ public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { - private final AtomicReference<QuorumPeer> peer = new AtomicReference<>(); + private QuorumPeer peer; @Inject public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) { - reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set); + peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer()); } @Override public void shutdown() { - peer.get().shutdown(Duration.ofMinutes(1)); + peer.shutdown(Duration.ofMinutes(1)); } @Override public void start(Path configFilePath) { - peer.get().start(configFilePath); + peer.start(configFilePath); } @Override diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java index 27aa18c64c7..ae7bf8d84f5 100644 --- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java +++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java @@ -2,11 +2,15 @@ package com.yahoo.vespa.zookeeper; import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -19,27 +23,28 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName()); @Override - public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { - ZooKeeperAdmin zooKeeperAdmin = null; - try { - zooKeeperAdmin = createAdmin(connectionSpec); + public void reconfigure(String connectionSpec, String servers) throws ReconfigException { + try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) { long fromConfig = -1; - // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0) - byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null); + // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0). + log.log(Level.INFO, "Applying ZooKeeper config: " + servers); + byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null); log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); - } catch (KeeperException e) { - if (retryOn(e)) - throw new ReconfigException(e); - else - throw new RuntimeException(e); - } catch (IOException | InterruptedException e) { + + // Verify by issuing a write operation; this is only accepted once new quorum is obtained. + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL); + zooKeeperAdmin.delete(node, -1); + + log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); + } + catch ( KeeperException.ReconfigInProgress + | KeeperException.ConnectionLossException + | KeeperException.NewConfigNoQuorum e) { + throw new ReconfigException(e); + } + catch (KeeperException | IOException | InterruptedException e) { throw new RuntimeException(e); - } finally { - if (zooKeeperAdmin != null) { - try { - zooKeeperAdmin.close(); - } catch (InterruptedException e) { /* ignore */} - } } } @@ -48,11 +53,5 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { (event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig()); } - private static boolean retryOn(KeeperException e) { - return e instanceof KeeperException.ReconfigInProgress || - e instanceof KeeperException.ConnectionLossException || - e instanceof KeeperException.NewConfigNoQuorum; - } - } diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java new file mode 100644 index 00000000000..405ec1365fa --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java @@ -0,0 +1,224 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.net.HostName; +import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer; +import com.yahoo.vespa.zookeeper.Reconfigurer; +import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl; +import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; + +public class VespaZooKeeperTest { + + static final Path tempDirRoot = getTmpDir(); + + /** + * Performs dynamic reconfiguration of ZooKeeper servers. + * + * First, a cluster of 3 servers is set up, and some data is written to it. + * Then, 3 new servers are added, and the first 3 marked for retirement; + * this should force the quorum to move the 3 new servers, but not disconnect the old ones. + * Next, the old servers are removed. + * Then, the cluster is reduced to size 1. + * Finally, the cluster grows to size 3 again. + * + * Throughout all of this, quorum should remain, and the data should remain the same. + */ + @Test(timeout = 120_000) + public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException { + List<ZooKeeper> keepers = new ArrayList<>(); + for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper()); + for (int i = 0; i < 8; i++) keepers.get(i).run(); + + // Start the first three servers. + List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0); + for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i); + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for all servers to be up and running. + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Write data to verify later. + String path = writeData(configs.get(0)); + + // Let three new servers join, causing the three older ones to retire and leave the ensemble. + configs = getConfigs(0, 3, 3, 3); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // The existing servers can't reconfigure and leave before the joiners are up. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for new quorum to be established. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + // Old servers are removed. + configs = getConfigs(3, 0, 3, 0); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // Old servers shut down, while the newer servers remain. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Ensure old servers shut down properly. + for (int i = 0; i < 3; i++) keepers.get(i).await(); + // Ensure new servers have reconfigured. + for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + + // Cluster shrinks to a single server. + configs = getConfigs(5, 0, 1, 0); + for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // We let the remaining server reconfigure the others out before they die. + for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 3; i < 5; i++) keepers.get(i).await(); + verifyData(path, configs.get(5)); + + // Cluster grows to 3 servers again. + configs = getConfigs(5, 0, 3, 2); + for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Wait for the joiners. + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + verifyData(path, configs.get(7)); + + // Let the remaining servers terminate. + for (int i = 5; i < 8; i++) keepers.get(i).config = null; + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 8; i++) keepers.get(i).await(); + } + + static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + ZooKeeperAdmin admin = createAdmin(config); + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL); + String read = new String(admin.getData(node, false, new Stat()), UTF_8); + assertEquals("hi", read); + return node; + } + + static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + for (int i = 0; i < 10; i++) { + try { + assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8)); + return; + } + catch (KeeperException.ConnectionLossException e) { + e.printStackTrace(); + Thread.sleep(10 << i); + } + } + } + + static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException { + return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(), + 10_000, + System.err::println, + new ZkClientConfigBuilder().toConfig()); + } + + static class ZooKeeper { + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Phaser phaser = new Phaser(2); + final AtomicReference<Future<?>> future = new AtomicReference<>(); + ZookeeperServerConfig config; + + void run() { + future.set(executor.submit(() -> { + Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl()); + phaser.arriveAndAwaitAdvance(); + while (config != null) { + new ReconfigurableVespaZooKeeperServer(reconfigurer, config); + phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here + phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff + } + reconfigurer.deconstruct(); + })); + } + + void await() throws ExecutionException, InterruptedException, TimeoutException { + future.get().get(30, SECONDS); + } + } + + static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) { + return IntStream.rangeClosed(1, removed + retired + active) + .mapToObj(id -> getConfig(removed, retired, active, joining, id)) + .collect(toList()); + } + + // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed. + static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) { + if (id <= removed) + return null; + + Path tempDir = tempDirRoot.resolve("zookeeper-" + id); + String[] version = System.getProperty("zk-version").split("\\."); + int versionPortOffset = 0; + for (String part : version) + versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part)); + int port = 51000 + versionPortOffset % 9785; + return new ZookeeperServerConfig.Builder() + .clientPort(port + 3 * id) + .dataDir(tempDir.toString()) + .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString()) + .myid(id) + .myidFile(tempDir.resolve("myid").toString()) + .dynamicReconfiguration(true) + .server(IntStream.rangeClosed(removed + 1, removed + retired + active) + .mapToObj(i -> new ZookeeperServerConfig.Server.Builder() + .id(i) + .clientPort(port + 3 * i) + .electionPort(port + 3 * i + 1) + .quorumPort(port + 3 * i + 2) + .hostname("localhost") + .joining(i - removed > retired + active - joining) + .retired(i - removed <= retired)) + .collect(toList())) + .build(); + } + + static Path getTmpDir() { + try { + Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test"); + tempDir.toFile().deleteOnExit(); + return tempDir.toAbsolutePath(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java index 39d0312915f..8b22f658a94 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java @@ -86,7 +86,7 @@ public class Configurator { sb.append("reconfigEnabled=true").append("\n"); sb.append("skipACL=yes").append("\n"); ensureThisServerIsRepresented(config.myid(), config.server()); - config.server().forEach(server -> addServerToCfg(sb, server, config.clientPort())); + config.server().forEach(server -> sb.append(serverSpec(server, config.clientPort(), server.joining())).append("\n")); sb.append(new TlsQuorumConfig().createConfig(vespaTlsConfig)); sb.append(new TlsClientServerConfig().createConfig(vespaTlsConfig)); return sb.toString(); @@ -111,7 +111,8 @@ public class Configurator { } } - private void addServerToCfg(StringBuilder sb, ZookeeperServerConfig.Server server, int clientPort) { + static String serverSpec(ZookeeperServerConfig.Server server, int clientPort, boolean joining) { + StringBuilder sb = new StringBuilder(); sb.append("server.") .append(server.id()) .append("=") @@ -120,7 +121,7 @@ public class Configurator { .append(server.quorumPort()) .append(":") .append(server.electionPort()); - if (server.joining()) { + if (joining) { // Servers that are joining an existing cluster must be marked as observers. Note that this will NOT // actually make the server an observer, but prevent it from forming an ensemble independently of the // existing cluster. @@ -130,8 +131,8 @@ public class Configurator { .append("observer"); } sb.append(";") - .append(clientPort) - .append("\n"); + .append(server.clientPort()); + return sb.toString(); } static List<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) { diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java index d4223e4d815..604419c063d 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -10,14 +10,14 @@ import com.yahoo.yolean.Exceptions; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.Collectors; + +import static com.yahoo.vespa.zookeeper.Configurator.serverSpec; +import static java.util.stream.Collectors.toList; /** * Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component @@ -50,17 +50,22 @@ public class Reconfigurer extends AbstractComponent { this.sleeper = Objects.requireNonNull(sleeper); } - void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server, - Supplier<QuorumPeer> quorumPeerGetter, Consumer<QuorumPeer> quorumPeerSetter) { + @Override + public void deconstruct() { + shutdown(); + } + + QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server, + Supplier<QuorumPeer> quorumPeerCreator) { if (zooKeeperRunner == null) { - peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with later servers. + peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers. zooKeeperRunner = startServer(newConfig, server); } - quorumPeerSetter.accept(peer); - if (shouldReconfigure(newConfig)) { + if (newConfig.dynamicReconfiguration()) { reconfigure(newConfig); } + return peer; } ZookeeperServerConfig activeConfig() { @@ -73,42 +78,30 @@ public class Reconfigurer extends AbstractComponent { } } - private boolean shouldReconfigure(ZookeeperServerConfig newConfig) { - if (!newConfig.dynamicReconfiguration()) return false; - if (activeConfig == null) return false; - return !newConfig.equals(activeConfig()); - } - private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) { ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server); activeConfig = zookeeperServerConfig; return runner; } + // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>) + // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers + // TODO jonmv: wrap Curator in Provider, for Curator shutdown private void reconfigure(ZookeeperServerConfig newConfig) { Instant reconfigTriggered = Instant.now(); - // No point in trying to reconfigure if there is only one server in the new ensemble, - // the others will be shutdown or are about to be shutdown - if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO); - - List<String> newServers = difference(servers(newConfig), servers(activeConfig)); - String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig)); - String joiningServersSpec = String.join(",", newServers); - leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds; - joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec; - log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec + - "\nleaving servers: " + leavingServerIds + + String newServers = String.join(",", servers(newConfig)); + log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." + "\nServers in active config:" + servers(activeConfig) + "\nServers in new config:" + servers(newConfig)); String connectionSpec = localConnectionSpec(activeConfig); Instant now = Instant.now(); - Duration reconfigTimeout = reconfigTimeout(newServers.size()); + Duration reconfigTimeout = reconfigTimeout(newConfig.server().size()); Instant end = now.plus(reconfigTimeout); // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed for (int attempt = 1; now.isBefore(end); attempt++) { try { Instant reconfigStarted = Instant.now(); - vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds); + vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers); Instant reconfigEnded = Instant.now(); log.log(Level.INFO, "Reconfiguration completed in " + Duration.between(reconfigTriggered, reconfigEnded) + @@ -147,24 +140,11 @@ public class Reconfigurer extends AbstractComponent { return HostName.getLocalhost() + ":" + config.clientPort(); } - private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) { - return difference(servers(oldConfig), servers(newConfig)).stream() - .map(server -> server.substring(0, server.indexOf('='))) - .collect(Collectors.toList()); - } - private static List<String> servers(ZookeeperServerConfig config) { - // See https://zookeeper.apache.org/doc/r3.6.3/zookeeperReconfig.html#sc_reconfig_clientport for format return config.server().stream() - .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" + - server.electionPort() + ";" + config.clientPort()) - .collect(Collectors.toList()); - } - - private static <T> List<T> difference(List<T> list1, List<T> list2) { - List<T> copy = new ArrayList<>(list1); - copy.removeAll(list2); - return copy; + .filter(server -> ! server.retired()) + .map(server -> serverSpec(server, config.clientPort(), false)) + .collect(toList()); } } diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java index 8809dca0def..59c9628bcab 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java @@ -10,7 +10,7 @@ import java.time.Duration; */ public interface VespaZooKeeperAdmin { - void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException; + void reconfigure(String connectionSpec, String servers) throws ReconfigException; /* Timeout for connecting to ZooKeeper */ default Duration sessionTimeout() { return Duration.ofSeconds(30); } diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java index 1211624e3d6..760c326cf5d 100644 --- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java +++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java @@ -17,7 +17,6 @@ import java.util.Arrays; import java.util.concurrent.Phaser; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; /** @@ -51,31 +50,26 @@ public class ReconfigurerTest { ZookeeperServerConfig nextConfig = createConfig(5, true); reconfigurer.startOrReconfigure(nextConfig); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); - assertNull("No servers are leaving", reconfigurer.leavingServers()); - assertEquals(1, reconfigurer.reconfigurations()); - assertSame(nextConfig, reconfigurer.activeConfig()); - - // No reconfiguration happens with same config - reconfigurer.startOrReconfigure(nextConfig); - assertEquals(1, reconfigurer.reconfigurations()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181", + reconfigurer.servers()); + assertEquals(2, reconfigurer.reconfigurations()); assertSame(nextConfig, reconfigurer.activeConfig()); // Cluster shrinks nextConfig = createConfig(3, true); reconfigurer.startOrReconfigure(nextConfig); - assertEquals(2, reconfigurer.reconfigurations()); + assertEquals(3, reconfigurer.reconfigurations()); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertNull("No servers are joining", reconfigurer.joiningServers()); - assertEquals("3,4", reconfigurer.leavingServers()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181", + reconfigurer.servers()); assertSame(nextConfig, reconfigurer.activeConfig()); // Cluster loses node1, but node3 joins. Indices are shuffled. nextConfig = createConfig(3, true, 1); reconfigurer.startOrReconfigure(nextConfig); - assertEquals(3, reconfigurer.reconfigurations()); - assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers()); - assertEquals("1,2", reconfigurer.leavingServers()); + assertEquals(4, reconfigurer.reconfigurations()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node2:2182:2183;2181,server.2=node3:2182:2183;2181", + reconfigurer.servers()); assertSame(nextConfig, reconfigurer.activeConfig()); } @@ -89,9 +83,9 @@ public class ReconfigurerTest { ZookeeperServerConfig nextConfig = createConfig(5, true); reconfigurer.startOrReconfigure(nextConfig); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); - assertNull("No servers are leaving", reconfigurer.leavingServers()); - assertEquals(1, reconfigurer.reconfigurations()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181", + reconfigurer.servers()); + assertEquals(2, reconfigurer.reconfigurations()); assertSame(nextConfig, reconfigurer.activeConfig()); } @@ -112,24 +106,27 @@ public class ReconfigurerTest { reconfigurer.shutdown(); } - private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) { - Arrays.sort(skipIndices); + private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... retiredIndices) { + Arrays.sort(retiredIndices); ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder(); builder.zooKeeperConfigFile(cfgFile.getAbsolutePath()); builder.myidFile(idFile.getAbsolutePath()); for (int i = 0, index = 0; i < numberOfServers; i++, index++) { - while (Arrays.binarySearch(skipIndices, index) >= 0) index++; - builder.server(newServer(i, "node" + index)); + boolean retired = Arrays.binarySearch(retiredIndices, index) >= 0; + if (retired) i--; + builder.server(newServer(i, "node" + index, retired)); } + builder.myid(0); builder.dynamicReconfiguration(dynamicReconfiguration); return builder.build(); } - private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) { + private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName, boolean retired) { ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder(); builder.id(id); builder.hostname(hostName); + builder.retired(retired); return builder; } @@ -142,6 +139,7 @@ public class ReconfigurerTest { private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer { private final TestableVespaZooKeeperAdmin zooKeeperAdmin; + private final Phaser phaser = new Phaser(2); private QuorumPeer serverPeer; TestableReconfigurer(TestableVespaZooKeeperAdmin zooKeeperAdmin) { @@ -156,19 +154,16 @@ public class ReconfigurerTest { } void startOrReconfigure(ZookeeperServerConfig newConfig) { - startOrReconfigure(newConfig, this, MockQuorumPeer::new, peer -> serverPeer = peer); + serverPeer = startOrReconfigure(newConfig, this, MockQuorumPeer::new); + phaser.arriveAndDeregister(); } String connectionSpec() { return zooKeeperAdmin.connectionSpec; } - String joiningServers() { - return zooKeeperAdmin.joiningServers; - } - - String leavingServers() { - return zooKeeperAdmin.leavingServers; + String servers() { + return zooKeeperAdmin.servers; } int reconfigurations() { @@ -177,10 +172,14 @@ public class ReconfigurerTest { @Override public void shutdown() { + phaser.arriveAndAwaitAdvance(); serverPeer.shutdown(Duration.ofSeconds(1)); } @Override - public void start(Path configFilePath) { serverPeer.start(configFilePath); } + public void start(Path configFilePath) { + phaser.arriveAndAwaitAdvance(); + serverPeer.start(configFilePath); + } @Override public boolean reconfigurable() { @@ -192,8 +191,7 @@ public class ReconfigurerTest { private static class TestableVespaZooKeeperAdmin implements VespaZooKeeperAdmin { String connectionSpec; - String joiningServers; - String leavingServers; + String servers; int reconfigurations = 0; private int failures = 0; @@ -205,12 +203,11 @@ public class ReconfigurerTest { } @Override - public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { + public void reconfigure(String connectionSpec, String servers) throws ReconfigException { if (++attempts < failures) throw new ReconfigException("Reconfig failed"); this.connectionSpec = connectionSpec; - this.joiningServers = joiningServers; - this.leavingServers = leavingServers; + this.servers = servers; this.reconfigurations++; } |