diff options
Diffstat (limited to 'zookeeper-server')
4 files changed, 427 insertions, 4 deletions
diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java new file mode 100644 index 00000000000..eede7239a4c --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java @@ -0,0 +1,213 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.net.HostName; +import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer; +import com.yahoo.vespa.zookeeper.Reconfigurer; +import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl; +import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; + +public class VespaZooKeeperTest { + + static final Path tempDirRoot = getTmpDir(); + + /** + * Performs dynamic reconfiguration of ZooKeeper servers. + * + * First, a cluster of 3 servers is set up, and some data is written to it. + * Then, 3 new servers are added, and the first 3 marked for retirement; + * this should force the quorum to move the 3 new servers, but not disconnect the old ones. + * Next, the old servers are removed. + * Then, 4 new servers are added. + * Finally, 6 servers are removed. + * + * Throughout all of this, quorum should remain, and the data should remain the same. + */ + @Test + public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException { + List<ZooKeeper> keepers = new ArrayList<>(); + for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper()); + for (int i = 0; i < 8; i++) keepers.get(i).run(); + + // Start the first three servers. + List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0); + for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i); + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for all servers to be up and running. + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Write data to verify later. + String path = writeData(configs.get(0)); + + // Let three new servers join, causing the three older ones to retire and leave the ensemble. + configs = getConfigs(0, 3, 3, 3); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // The existing servers can't reconfigure and leave before the joiners are up. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for new quorum to be established. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + // Old servers are removed. + configs = getConfigs(3, 0, 3, 0); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // Old servers shut down, while the newer servers remain. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Ensure old servers shut down properly. + for (int i = 0; i < 3; i++) keepers.get(i).await(); + // Ensure new servers have reconfigured. + for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + + // Cluster shrinks to a single server. + // TODO: Consider throwing old dynamic config file when this can never allow quorum with existing servers. + configs = getConfigs(5, 0, 1, 0); + for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // We let the remaining server reconfigure the others out before they die. + for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 3; i < 5; i++) keepers.get(i).await(); + verifyData(path, configs.get(5)); + + // Cluster grows to 3 servers again. + configs = getConfigs(5, 0, 3, 2); + for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Wait for the joiners. + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + verifyData(path, configs.get(7)); + + // Let the remaining servers terminate. + for (int i = 5; i < 8; i++) keepers.get(i).config = null; + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 8; i++) keepers.get(i).await(); + } + + static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + ZooKeeperAdmin admin = createAdmin(config); + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL); + String read = new String(admin.getData(node, false, new Stat()), UTF_8); + assertEquals("hi", read); + return node; + } + + static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + ZooKeeperAdmin admin = createAdmin(config); + assertEquals("hi", new String(admin.getData(path, false, new Stat()), UTF_8)); + } + + static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException { + return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(), + 10_000, + System.err::println, + new ZkClientConfigBuilder().toConfig()); + } + + static class ZooKeeper { + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Phaser phaser = new Phaser(2); + final AtomicReference<Future<?>> future = new AtomicReference<>(); + ZookeeperServerConfig config; + + void run() { + future.set(executor.submit(() -> { + Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl()); + phaser.arriveAndAwaitAdvance(); + while (config != null) { + new ReconfigurableVespaZooKeeperServer(reconfigurer, config); + phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here + phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff + } + reconfigurer.deconstruct(); + })); + } + + void await() throws ExecutionException, InterruptedException, TimeoutException { + future.get().get(30, SECONDS); + } + } + + static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) { + return IntStream.rangeClosed(1, removed + retired + active) + .mapToObj(id -> getConfig(removed, retired, active, joining, id)) + .collect(toList()); + } + + // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed. + static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) { + if (id <= removed) + return null; + + Path tempDir = tempDirRoot.resolve("zookeeper-" + id); + int port = 59267; + return new ZookeeperServerConfig.Builder() + .clientPort(port + 3 * id) + .dataDir(tempDir.toString()) + .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString()) + .myid(id) + .myidFile(tempDir.resolve("myid").toString()) + .dynamicReconfiguration(true) + .server(IntStream.rangeClosed(removed + 1, removed + retired + active) + .mapToObj(i -> new ZookeeperServerConfig.Server.Builder() + .id(i) + .clientPort(port + 3 * i) + .electionPort(port + 3 * i + 1) + .quorumPort(port + 3 * i + 2) + .hostname("localhost") + .joining(i - removed > retired + active - joining) + .retired(i - removed <= retired)) + .collect(toList())) + .build(); + } + + static Path getTmpDir() { + try { + Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test"); + tempDir.toFile().deleteOnExit(); + return tempDir.toAbsolutePath(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java new file mode 100644 index 00000000000..eede7239a4c --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java @@ -0,0 +1,213 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.net.HostName; +import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer; +import com.yahoo.vespa.zookeeper.Reconfigurer; +import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl; +import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; + +public class VespaZooKeeperTest { + + static final Path tempDirRoot = getTmpDir(); + + /** + * Performs dynamic reconfiguration of ZooKeeper servers. + * + * First, a cluster of 3 servers is set up, and some data is written to it. + * Then, 3 new servers are added, and the first 3 marked for retirement; + * this should force the quorum to move the 3 new servers, but not disconnect the old ones. + * Next, the old servers are removed. + * Then, 4 new servers are added. + * Finally, 6 servers are removed. + * + * Throughout all of this, quorum should remain, and the data should remain the same. + */ + @Test + public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException { + List<ZooKeeper> keepers = new ArrayList<>(); + for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper()); + for (int i = 0; i < 8; i++) keepers.get(i).run(); + + // Start the first three servers. + List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0); + for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i); + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for all servers to be up and running. + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Write data to verify later. + String path = writeData(configs.get(0)); + + // Let three new servers join, causing the three older ones to retire and leave the ensemble. + configs = getConfigs(0, 3, 3, 3); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // The existing servers can't reconfigure and leave before the joiners are up. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for new quorum to be established. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + // Old servers are removed. + configs = getConfigs(3, 0, 3, 0); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // Old servers shut down, while the newer servers remain. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Ensure old servers shut down properly. + for (int i = 0; i < 3; i++) keepers.get(i).await(); + // Ensure new servers have reconfigured. + for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + + // Cluster shrinks to a single server. + // TODO: Consider throwing old dynamic config file when this can never allow quorum with existing servers. + configs = getConfigs(5, 0, 1, 0); + for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // We let the remaining server reconfigure the others out before they die. + for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 3; i < 5; i++) keepers.get(i).await(); + verifyData(path, configs.get(5)); + + // Cluster grows to 3 servers again. + configs = getConfigs(5, 0, 3, 2); + for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Wait for the joiners. + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + verifyData(path, configs.get(7)); + + // Let the remaining servers terminate. + for (int i = 5; i < 8; i++) keepers.get(i).config = null; + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 8; i++) keepers.get(i).await(); + } + + static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + ZooKeeperAdmin admin = createAdmin(config); + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL); + String read = new String(admin.getData(node, false, new Stat()), UTF_8); + assertEquals("hi", read); + return node; + } + + static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + ZooKeeperAdmin admin = createAdmin(config); + assertEquals("hi", new String(admin.getData(path, false, new Stat()), UTF_8)); + } + + static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException { + return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(), + 10_000, + System.err::println, + new ZkClientConfigBuilder().toConfig()); + } + + static class ZooKeeper { + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Phaser phaser = new Phaser(2); + final AtomicReference<Future<?>> future = new AtomicReference<>(); + ZookeeperServerConfig config; + + void run() { + future.set(executor.submit(() -> { + Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl()); + phaser.arriveAndAwaitAdvance(); + while (config != null) { + new ReconfigurableVespaZooKeeperServer(reconfigurer, config); + phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here + phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff + } + reconfigurer.deconstruct(); + })); + } + + void await() throws ExecutionException, InterruptedException, TimeoutException { + future.get().get(30, SECONDS); + } + } + + static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) { + return IntStream.rangeClosed(1, removed + retired + active) + .mapToObj(id -> getConfig(removed, retired, active, joining, id)) + .collect(toList()); + } + + // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed. + static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) { + if (id <= removed) + return null; + + Path tempDir = tempDirRoot.resolve("zookeeper-" + id); + int port = 59267; + return new ZookeeperServerConfig.Builder() + .clientPort(port + 3 * id) + .dataDir(tempDir.toString()) + .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString()) + .myid(id) + .myidFile(tempDir.resolve("myid").toString()) + .dynamicReconfiguration(true) + .server(IntStream.rangeClosed(removed + 1, removed + retired + active) + .mapToObj(i -> new ZookeeperServerConfig.Server.Builder() + .id(i) + .clientPort(port + 3 * i) + .electionPort(port + 3 * i + 1) + .quorumPort(port + 3 * i + 2) + .hostname("localhost") + .joining(i - removed > retired + active - joining) + .retired(i - removed <= retired)) + .collect(toList())) + .build(); + } + + static Path getTmpDir() { + try { + Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test"); + tempDir.toFile().deleteOnExit(); + return tempDir.toAbsolutePath(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java index c65035106f1..5157dd5d59c 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java @@ -130,7 +130,7 @@ public class Configurator { .append("observer"); } sb.append(";") - .append(clientPort); + .append(server.clientPort()); return sb.toString(); } diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java index 77811dd6bee..604419c063d 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -86,10 +86,7 @@ public class Reconfigurer extends AbstractComponent { // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>) // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers - // TODO jonmv: verify reconfig by issuing a dummy write // TODO jonmv: wrap Curator in Provider, for Curator shutdown - // TODO jonmv: scale down to 1 server as well - // TODO jonmv: unit test this private void reconfigure(ZookeeperServerConfig newConfig) { Instant reconfigTriggered = Instant.now(); String newServers = String.join(",", servers(newConfig)); |